1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37#[derive(Clone, Hash)]
41pub struct DebugExpr(pub Box<syn::Expr>);
42
43impl serde::Serialize for DebugExpr {
44 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
45 serializer.serialize_str(&self.to_string())
46 }
47}
48
49impl From<syn::Expr> for DebugExpr {
50 fn from(expr: syn::Expr) -> Self {
51 Self(Box::new(expr))
52 }
53}
54
55impl Deref for DebugExpr {
56 type Target = syn::Expr;
57
58 fn deref(&self) -> &Self::Target {
59 &self.0
60 }
61}
62
63impl ToTokens for DebugExpr {
64 fn to_tokens(&self, tokens: &mut TokenStream) {
65 self.0.to_tokens(tokens);
66 }
67}
68
69impl Debug for DebugExpr {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 write!(f, "{}", self.0.to_token_stream())
72 }
73}
74
75impl Display for DebugExpr {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 let original = self.0.as_ref().clone();
78 let simplified = simplify_q_macro(original);
79
80 write!(f, "q!({})", quote::quote!(#simplified))
83 }
84}
85
86fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
88 let mut simplifier = QMacroSimplifier::new();
91 simplifier.visit_expr_mut(&mut expr);
92
93 if let Some(simplified) = simplifier.simplified_result {
95 simplified
96 } else {
97 expr
98 }
99}
100
101#[derive(Default)]
103pub struct QMacroSimplifier {
104 pub simplified_result: Option<syn::Expr>,
105}
106
107impl QMacroSimplifier {
108 pub fn new() -> Self {
109 Self::default()
110 }
111}
112
113impl VisitMut for QMacroSimplifier {
114 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
115 if self.simplified_result.is_some() {
117 return;
118 }
119
120 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
121 && self.is_stageleft_runtime_support_call(&path_expr.path)
123 && let Some(closure) = self.extract_closure_from_args(&call.args)
125 {
126 self.simplified_result = Some(closure);
127 return;
128 }
129
130 syn::visit_mut::visit_expr_mut(self, expr);
133 }
134}
135
136impl QMacroSimplifier {
137 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
138 if let Some(last_segment) = path.segments.last() {
140 let fn_name = last_segment.ident.to_string();
141 fn_name.contains("_type_hint")
143 && path.segments.len() > 2
144 && path.segments[0].ident == "stageleft"
145 && path.segments[1].ident == "runtime_support"
146 } else {
147 false
148 }
149 }
150
151 fn extract_closure_from_args(
152 &self,
153 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
154 ) -> Option<syn::Expr> {
155 for arg in args {
157 if let syn::Expr::Closure(_) = arg {
158 return Some(arg.clone());
159 }
160 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
162 return Some(closure_expr);
163 }
164 }
165 None
166 }
167
168 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
169 let mut visitor = ClosureFinder {
170 found_closure: None,
171 prefer_inner_blocks: true,
172 };
173 visitor.visit_expr(expr);
174 visitor.found_closure
175 }
176}
177
178struct ClosureFinder {
180 found_closure: Option<syn::Expr>,
181 prefer_inner_blocks: bool,
182}
183
184impl<'ast> Visit<'ast> for ClosureFinder {
185 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
186 if self.found_closure.is_some() {
188 return;
189 }
190
191 match expr {
192 syn::Expr::Closure(_) => {
193 self.found_closure = Some(expr.clone());
194 }
195 syn::Expr::Block(block) if self.prefer_inner_blocks => {
196 for stmt in &block.block.stmts {
198 if let syn::Stmt::Expr(stmt_expr, _) = stmt
199 && let syn::Expr::Block(_) = stmt_expr
200 {
201 let mut inner_visitor = ClosureFinder {
203 found_closure: None,
204 prefer_inner_blocks: false, };
206 inner_visitor.visit_expr(stmt_expr);
207 if inner_visitor.found_closure.is_some() {
208 self.found_closure = Some(stmt_expr.clone());
210 return;
211 }
212 }
213 }
214
215 visit::visit_expr(self, expr);
217
218 if self.found_closure.is_some() {
221 }
223 }
224 _ => {
225 visit::visit_expr(self, expr);
227 }
228 }
229 }
230}
231
232#[derive(Clone, PartialEq, Eq, Hash)]
236pub struct DebugType(pub Box<syn::Type>);
237
238impl From<syn::Type> for DebugType {
239 fn from(t: syn::Type) -> Self {
240 Self(Box::new(t))
241 }
242}
243
244impl Deref for DebugType {
245 type Target = syn::Type;
246
247 fn deref(&self) -> &Self::Target {
248 &self.0
249 }
250}
251
252impl ToTokens for DebugType {
253 fn to_tokens(&self, tokens: &mut TokenStream) {
254 self.0.to_tokens(tokens);
255 }
256}
257
258impl Debug for DebugType {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 write!(f, "{}", self.0.to_token_stream())
261 }
262}
263
264impl serde::Serialize for DebugType {
265 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
266 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
267 }
268}
269
270fn serialize_backtrace_as_span<S: serde::Serializer>(
271 backtrace: &Backtrace,
272 serializer: S,
273) -> Result<S::Ok, S::Error> {
274 match backtrace.format_span() {
275 Some(span) => serializer.serialize_some(&span),
276 None => serializer.serialize_none(),
277 }
278}
279
280fn serialize_ident<S: serde::Serializer>(
281 ident: &syn::Ident,
282 serializer: S,
283) -> Result<S::Ok, S::Error> {
284 serializer.serialize_str(&ident.to_string())
285}
286
287pub enum DebugInstantiate {
288 Building,
289 Finalized(Box<DebugInstantiateFinalized>),
290}
291
292impl serde::Serialize for DebugInstantiate {
293 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
294 match self {
295 DebugInstantiate::Building => {
296 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
297 }
298 DebugInstantiate::Finalized(_) => {
299 panic!(
300 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
301 )
302 }
303 }
304 }
305}
306
307#[cfg_attr(
308 not(feature = "build"),
309 expect(
310 dead_code,
311 reason = "sink, source unused without `feature = \"build\"`."
312 )
313)]
314pub struct DebugInstantiateFinalized {
315 sink: syn::Expr,
316 source: syn::Expr,
317 connect_fn: Option<Box<dyn FnOnce()>>,
318}
319
320impl From<DebugInstantiateFinalized> for DebugInstantiate {
321 fn from(f: DebugInstantiateFinalized) -> Self {
322 Self::Finalized(Box::new(f))
323 }
324}
325
326impl Debug for DebugInstantiate {
327 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
328 write!(f, "<network instantiate>")
329 }
330}
331
332impl Hash for DebugInstantiate {
333 fn hash<H: Hasher>(&self, _state: &mut H) {
334 }
336}
337
338impl Clone for DebugInstantiate {
339 fn clone(&self) -> Self {
340 match self {
341 DebugInstantiate::Building => DebugInstantiate::Building,
342 DebugInstantiate::Finalized(_) => {
343 panic!("DebugInstantiate::Finalized should not be cloned")
344 }
345 }
346 }
347}
348
349#[derive(Debug, Hash, Clone, serde::Serialize)]
358pub enum ClusterMembersState {
359 Uninit,
361 Stream(DebugExpr),
364 Tee(LocationId, LocationId),
368}
369
370#[derive(Debug, Hash, Clone, serde::Serialize)]
372pub enum HydroSource {
373 Stream(DebugExpr),
374 ExternalNetwork(),
375 Iter(DebugExpr),
376 Spin(),
377 ClusterMembers(LocationId, ClusterMembersState),
378 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
379 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
380}
381
382#[cfg(feature = "build")]
383pub trait DfirBuilder {
389 fn singleton_intermediates(&self) -> bool;
391
392 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
394
395 fn batch(
396 &mut self,
397 in_ident: syn::Ident,
398 in_location: &LocationId,
399 in_kind: &CollectionKind,
400 out_ident: &syn::Ident,
401 out_location: &LocationId,
402 op_meta: &HydroIrOpMetadata,
403 );
404 fn yield_from_tick(
405 &mut self,
406 in_ident: syn::Ident,
407 in_location: &LocationId,
408 in_kind: &CollectionKind,
409 out_ident: &syn::Ident,
410 out_location: &LocationId,
411 );
412
413 fn begin_atomic(
414 &mut self,
415 in_ident: syn::Ident,
416 in_location: &LocationId,
417 in_kind: &CollectionKind,
418 out_ident: &syn::Ident,
419 out_location: &LocationId,
420 op_meta: &HydroIrOpMetadata,
421 );
422 fn end_atomic(
423 &mut self,
424 in_ident: syn::Ident,
425 in_location: &LocationId,
426 in_kind: &CollectionKind,
427 out_ident: &syn::Ident,
428 );
429
430 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
431 fn observe_nondet(
432 &mut self,
433 trusted: bool,
434 location: &LocationId,
435 in_ident: syn::Ident,
436 in_kind: &CollectionKind,
437 out_ident: &syn::Ident,
438 out_kind: &CollectionKind,
439 op_meta: &HydroIrOpMetadata,
440 );
441
442 #[expect(clippy::too_many_arguments, reason = "TODO")]
443 fn merge_ordered(
444 &mut self,
445 location: &LocationId,
446 first_ident: syn::Ident,
447 second_ident: syn::Ident,
448 out_ident: &syn::Ident,
449 in_kind: &CollectionKind,
450 op_meta: &HydroIrOpMetadata,
451 operator_tag: Option<&str>,
452 );
453
454 #[expect(clippy::too_many_arguments, reason = "TODO")]
455 fn create_network(
456 &mut self,
457 from: &LocationId,
458 to: &LocationId,
459 input_ident: syn::Ident,
460 out_ident: &syn::Ident,
461 serialize: Option<&DebugExpr>,
462 sink: syn::Expr,
463 source: syn::Expr,
464 deserialize: Option<&DebugExpr>,
465 tag_id: usize,
466 networking_info: &crate::networking::NetworkingInfo,
467 );
468
469 fn create_external_source(
470 &mut self,
471 on: &LocationId,
472 source_expr: syn::Expr,
473 out_ident: &syn::Ident,
474 deserialize: Option<&DebugExpr>,
475 tag_id: usize,
476 );
477
478 fn create_external_output(
479 &mut self,
480 on: &LocationId,
481 sink_expr: syn::Expr,
482 input_ident: &syn::Ident,
483 serialize: Option<&DebugExpr>,
484 tag_id: usize,
485 );
486}
487
488#[cfg(feature = "build")]
489impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
490 fn singleton_intermediates(&self) -> bool {
491 false
492 }
493
494 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
495 self.entry(location.root().key())
496 .expect("location was removed")
497 .or_default()
498 }
499
500 fn batch(
501 &mut self,
502 in_ident: syn::Ident,
503 in_location: &LocationId,
504 in_kind: &CollectionKind,
505 out_ident: &syn::Ident,
506 _out_location: &LocationId,
507 _op_meta: &HydroIrOpMetadata,
508 ) {
509 let builder = self.get_dfir_mut(in_location.root());
510 if in_kind.is_bounded()
511 && matches!(
512 in_kind,
513 CollectionKind::Singleton { .. }
514 | CollectionKind::Optional { .. }
515 | CollectionKind::KeyedSingleton { .. }
516 )
517 {
518 assert!(in_location.is_top_level());
519 builder.add_dfir(
520 parse_quote! {
521 #out_ident = #in_ident -> persist::<'static>();
522 },
523 None,
524 None,
525 );
526 } else {
527 builder.add_dfir(
528 parse_quote! {
529 #out_ident = #in_ident;
530 },
531 None,
532 None,
533 );
534 }
535 }
536
537 fn yield_from_tick(
538 &mut self,
539 in_ident: syn::Ident,
540 in_location: &LocationId,
541 _in_kind: &CollectionKind,
542 out_ident: &syn::Ident,
543 _out_location: &LocationId,
544 ) {
545 let builder = self.get_dfir_mut(in_location.root());
546 builder.add_dfir(
547 parse_quote! {
548 #out_ident = #in_ident;
549 },
550 None,
551 None,
552 );
553 }
554
555 fn begin_atomic(
556 &mut self,
557 in_ident: syn::Ident,
558 in_location: &LocationId,
559 _in_kind: &CollectionKind,
560 out_ident: &syn::Ident,
561 _out_location: &LocationId,
562 _op_meta: &HydroIrOpMetadata,
563 ) {
564 let builder = self.get_dfir_mut(in_location.root());
565 builder.add_dfir(
566 parse_quote! {
567 #out_ident = #in_ident;
568 },
569 None,
570 None,
571 );
572 }
573
574 fn end_atomic(
575 &mut self,
576 in_ident: syn::Ident,
577 in_location: &LocationId,
578 _in_kind: &CollectionKind,
579 out_ident: &syn::Ident,
580 ) {
581 let builder = self.get_dfir_mut(in_location.root());
582 builder.add_dfir(
583 parse_quote! {
584 #out_ident = #in_ident;
585 },
586 None,
587 None,
588 );
589 }
590
591 fn observe_nondet(
592 &mut self,
593 _trusted: bool,
594 location: &LocationId,
595 in_ident: syn::Ident,
596 _in_kind: &CollectionKind,
597 out_ident: &syn::Ident,
598 _out_kind: &CollectionKind,
599 _op_meta: &HydroIrOpMetadata,
600 ) {
601 let builder = self.get_dfir_mut(location);
602 builder.add_dfir(
603 parse_quote! {
604 #out_ident = #in_ident;
605 },
606 None,
607 None,
608 );
609 }
610
611 fn merge_ordered(
612 &mut self,
613 location: &LocationId,
614 first_ident: syn::Ident,
615 second_ident: syn::Ident,
616 out_ident: &syn::Ident,
617 _in_kind: &CollectionKind,
618 _op_meta: &HydroIrOpMetadata,
619 operator_tag: Option<&str>,
620 ) {
621 let builder = self.get_dfir_mut(location);
622 builder.add_dfir(
623 parse_quote! {
624 #out_ident = union();
625 #first_ident -> [0]#out_ident;
626 #second_ident -> [1]#out_ident;
627 },
628 None,
629 operator_tag,
630 );
631 }
632
633 fn create_network(
634 &mut self,
635 from: &LocationId,
636 to: &LocationId,
637 input_ident: syn::Ident,
638 out_ident: &syn::Ident,
639 serialize: Option<&DebugExpr>,
640 sink: syn::Expr,
641 source: syn::Expr,
642 deserialize: Option<&DebugExpr>,
643 tag_id: usize,
644 _networking_info: &crate::networking::NetworkingInfo,
645 ) {
646 let sender_builder = self.get_dfir_mut(from);
647 if let Some(serialize_pipeline) = serialize {
648 sender_builder.add_dfir(
649 parse_quote! {
650 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
651 },
652 None,
653 Some(&format!("send{}", tag_id)),
655 );
656 } else {
657 sender_builder.add_dfir(
658 parse_quote! {
659 #input_ident -> dest_sink(#sink);
660 },
661 None,
662 Some(&format!("send{}", tag_id)),
663 );
664 }
665
666 let receiver_builder = self.get_dfir_mut(to);
667 if let Some(deserialize_pipeline) = deserialize {
668 receiver_builder.add_dfir(
669 parse_quote! {
670 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
671 },
672 None,
673 Some(&format!("recv{}", tag_id)),
674 );
675 } else {
676 receiver_builder.add_dfir(
677 parse_quote! {
678 #out_ident = source_stream(#source);
679 },
680 None,
681 Some(&format!("recv{}", tag_id)),
682 );
683 }
684 }
685
686 fn create_external_source(
687 &mut self,
688 on: &LocationId,
689 source_expr: syn::Expr,
690 out_ident: &syn::Ident,
691 deserialize: Option<&DebugExpr>,
692 tag_id: usize,
693 ) {
694 let receiver_builder = self.get_dfir_mut(on);
695 if let Some(deserialize_pipeline) = deserialize {
696 receiver_builder.add_dfir(
697 parse_quote! {
698 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
699 },
700 None,
701 Some(&format!("recv{}", tag_id)),
702 );
703 } else {
704 receiver_builder.add_dfir(
705 parse_quote! {
706 #out_ident = source_stream(#source_expr);
707 },
708 None,
709 Some(&format!("recv{}", tag_id)),
710 );
711 }
712 }
713
714 fn create_external_output(
715 &mut self,
716 on: &LocationId,
717 sink_expr: syn::Expr,
718 input_ident: &syn::Ident,
719 serialize: Option<&DebugExpr>,
720 tag_id: usize,
721 ) {
722 let sender_builder = self.get_dfir_mut(on);
723 if let Some(serialize_fn) = serialize {
724 sender_builder.add_dfir(
725 parse_quote! {
726 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
727 },
728 None,
729 Some(&format!("send{}", tag_id)),
731 );
732 } else {
733 sender_builder.add_dfir(
734 parse_quote! {
735 #input_ident -> dest_sink(#sink_expr);
736 },
737 None,
738 Some(&format!("send{}", tag_id)),
739 );
740 }
741 }
742}
743
744#[cfg(feature = "build")]
745pub enum BuildersOrCallback<'a, L, N>
746where
747 L: FnMut(&mut HydroRoot, &mut usize),
748 N: FnMut(&mut HydroNode, &mut usize),
749{
750 Builders(&'a mut dyn DfirBuilder),
751 Callback(L, N),
752}
753
754#[derive(Debug, Hash, serde::Serialize)]
758pub enum HydroRoot {
759 ForEach {
760 f: DebugExpr,
761 input: Box<HydroNode>,
762 op_metadata: HydroIrOpMetadata,
763 },
764 SendExternal {
765 to_external_key: LocationKey,
766 to_port_id: ExternalPortId,
767 to_many: bool,
768 unpaired: bool,
769 serialize_fn: Option<DebugExpr>,
770 instantiate_fn: DebugInstantiate,
771 input: Box<HydroNode>,
772 op_metadata: HydroIrOpMetadata,
773 },
774 DestSink {
775 sink: DebugExpr,
776 input: Box<HydroNode>,
777 op_metadata: HydroIrOpMetadata,
778 },
779 CycleSink {
780 cycle_id: CycleId,
781 input: Box<HydroNode>,
782 op_metadata: HydroIrOpMetadata,
783 },
784 EmbeddedOutput {
785 #[serde(serialize_with = "serialize_ident")]
786 ident: syn::Ident,
787 input: Box<HydroNode>,
788 op_metadata: HydroIrOpMetadata,
789 },
790 Null {
791 input: Box<HydroNode>,
792 op_metadata: HydroIrOpMetadata,
793 },
794}
795
796impl HydroRoot {
797 #[cfg(feature = "build")]
798 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
799 pub fn compile_network<'a, D>(
800 &mut self,
801 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
802 seen_tees: &mut SeenSharedNodes,
803 seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
804 processes: &SparseSecondaryMap<LocationKey, D::Process>,
805 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
806 externals: &SparseSecondaryMap<LocationKey, D::External>,
807 env: &mut D::InstantiateEnv,
808 ) where
809 D: Deploy<'a>,
810 {
811 let refcell_extra_stmts = RefCell::new(extra_stmts);
812 let refcell_env = RefCell::new(env);
813 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
814 self.transform_bottom_up(
815 &mut |l| {
816 if let HydroRoot::SendExternal {
817 input,
818 to_external_key,
819 to_port_id,
820 to_many,
821 unpaired,
822 instantiate_fn,
823 ..
824 } = l
825 {
826 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
827 DebugInstantiate::Building => {
828 let to_node = externals
829 .get(*to_external_key)
830 .unwrap_or_else(|| {
831 panic!("A external used in the graph was not instantiated: {}", to_external_key)
832 })
833 .clone();
834
835 match input.metadata().location_id.root() {
836 &LocationId::Process(process_key) => {
837 if *to_many {
838 (
839 (
840 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
841 parse_quote!(DUMMY),
842 ),
843 Box::new(|| {}) as Box<dyn FnOnce()>,
844 )
845 } else {
846 let from_node = processes
847 .get(process_key)
848 .unwrap_or_else(|| {
849 panic!("A process used in the graph was not instantiated: {}", process_key)
850 })
851 .clone();
852
853 let sink_port = from_node.next_port();
854 let source_port = to_node.next_port();
855
856 if *unpaired {
857 use stageleft::quote_type;
858 use tokio_util::codec::LengthDelimitedCodec;
859
860 to_node.register(*to_port_id, source_port.clone());
861
862 let _ = D::e2o_source(
863 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
864 &to_node, &source_port,
865 &from_node, &sink_port,
866 "e_type::<LengthDelimitedCodec>(),
867 format!("{}_{}", *to_external_key, *to_port_id)
868 );
869 }
870
871 (
872 (
873 D::o2e_sink(
874 &from_node,
875 &sink_port,
876 &to_node,
877 &source_port,
878 format!("{}_{}", *to_external_key, *to_port_id)
879 ),
880 parse_quote!(DUMMY),
881 ),
882 if *unpaired {
883 D::e2o_connect(
884 &to_node,
885 &source_port,
886 &from_node,
887 &sink_port,
888 *to_many,
889 NetworkHint::Auto,
890 )
891 } else {
892 Box::new(|| {}) as Box<dyn FnOnce()>
893 },
894 )
895 }
896 }
897 LocationId::Cluster(cluster_key) => {
898 let from_node = clusters
899 .get(*cluster_key)
900 .unwrap_or_else(|| {
901 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
902 })
903 .clone();
904
905 let sink_port = from_node.next_port();
906 let source_port = to_node.next_port();
907
908 if *unpaired {
909 to_node.register(*to_port_id, source_port.clone());
910 }
911
912 (
913 (
914 D::m2e_sink(
915 &from_node,
916 &sink_port,
917 &to_node,
918 &source_port,
919 format!("{}_{}", *to_external_key, *to_port_id)
920 ),
921 parse_quote!(DUMMY),
922 ),
923 Box::new(|| {}) as Box<dyn FnOnce()>,
924 )
925 }
926 _ => panic!()
927 }
928 },
929
930 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
931 };
932
933 *instantiate_fn = DebugInstantiateFinalized {
934 sink: sink_expr,
935 source: source_expr,
936 connect_fn: Some(connect_fn),
937 }
938 .into();
939 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
940 let element_type = match &input.metadata().collection_kind {
941 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
942 _ => panic!("Embedded output must have Stream collection kind"),
943 };
944 let location_key = match input.metadata().location_id.root() {
945 LocationId::Process(key) | LocationId::Cluster(key) => *key,
946 _ => panic!("Embedded output must be on a process or cluster"),
947 };
948 D::register_embedded_output(
949 &mut refcell_env.borrow_mut(),
950 location_key,
951 ident,
952 &element_type,
953 );
954 }
955 },
956 &mut |n| {
957 if let HydroNode::Network {
958 name,
959 networking_info,
960 input,
961 instantiate_fn,
962 metadata,
963 ..
964 } = n
965 {
966 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
967 DebugInstantiate::Building => instantiate_network::<D>(
968 &mut refcell_env.borrow_mut(),
969 input.metadata().location_id.root(),
970 metadata.location_id.root(),
971 processes,
972 clusters,
973 name.as_deref(),
974 networking_info,
975 ),
976
977 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
978 };
979
980 *instantiate_fn = DebugInstantiateFinalized {
981 sink: sink_expr,
982 source: source_expr,
983 connect_fn: Some(connect_fn),
984 }
985 .into();
986 } else if let HydroNode::ExternalInput {
987 from_external_key,
988 from_port_id,
989 from_many,
990 codec_type,
991 port_hint,
992 instantiate_fn,
993 metadata,
994 ..
995 } = n
996 {
997 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
998 DebugInstantiate::Building => {
999 let from_node = externals
1000 .get(*from_external_key)
1001 .unwrap_or_else(|| {
1002 panic!(
1003 "A external used in the graph was not instantiated: {}",
1004 from_external_key,
1005 )
1006 })
1007 .clone();
1008
1009 match metadata.location_id.root() {
1010 &LocationId::Process(process_key) => {
1011 let to_node = processes
1012 .get(process_key)
1013 .unwrap_or_else(|| {
1014 panic!("A process used in the graph was not instantiated: {}", process_key)
1015 })
1016 .clone();
1017
1018 let sink_port = from_node.next_port();
1019 let source_port = to_node.next_port();
1020
1021 from_node.register(*from_port_id, sink_port.clone());
1022
1023 (
1024 (
1025 parse_quote!(DUMMY),
1026 if *from_many {
1027 D::e2o_many_source(
1028 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1029 &to_node, &source_port,
1030 codec_type.0.as_ref(),
1031 format!("{}_{}", *from_external_key, *from_port_id)
1032 )
1033 } else {
1034 D::e2o_source(
1035 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1036 &from_node, &sink_port,
1037 &to_node, &source_port,
1038 codec_type.0.as_ref(),
1039 format!("{}_{}", *from_external_key, *from_port_id)
1040 )
1041 },
1042 ),
1043 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1044 )
1045 }
1046 LocationId::Cluster(cluster_key) => {
1047 let to_node = clusters
1048 .get(*cluster_key)
1049 .unwrap_or_else(|| {
1050 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1051 })
1052 .clone();
1053
1054 let sink_port = from_node.next_port();
1055 let source_port = to_node.next_port();
1056
1057 from_node.register(*from_port_id, sink_port.clone());
1058
1059 (
1060 (
1061 parse_quote!(DUMMY),
1062 D::e2m_source(
1063 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1064 &from_node, &sink_port,
1065 &to_node, &source_port,
1066 codec_type.0.as_ref(),
1067 format!("{}_{}", *from_external_key, *from_port_id)
1068 ),
1069 ),
1070 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1071 )
1072 }
1073 _ => panic!()
1074 }
1075 },
1076
1077 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1078 };
1079
1080 *instantiate_fn = DebugInstantiateFinalized {
1081 sink: sink_expr,
1082 source: source_expr,
1083 connect_fn: Some(connect_fn),
1084 }
1085 .into();
1086 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1087 let element_type = match &metadata.collection_kind {
1088 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1089 _ => panic!("Embedded source must have Stream collection kind"),
1090 };
1091 let location_key = match metadata.location_id.root() {
1092 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1093 _ => panic!("Embedded source must be on a process or cluster"),
1094 };
1095 D::register_embedded_stream_input(
1096 &mut refcell_env.borrow_mut(),
1097 location_key,
1098 ident,
1099 &element_type,
1100 );
1101 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1102 let element_type = match &metadata.collection_kind {
1103 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1104 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1105 };
1106 let location_key = match metadata.location_id.root() {
1107 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1108 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1109 };
1110 D::register_embedded_singleton_input(
1111 &mut refcell_env.borrow_mut(),
1112 location_key,
1113 ident,
1114 &element_type,
1115 );
1116 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1117 match state {
1118 ClusterMembersState::Uninit => {
1119 let at_location = metadata.location_id.root().clone();
1120 let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1121 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1122 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1124 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1125 &(),
1126 );
1127 *state = ClusterMembersState::Stream(expr.into());
1128 } else {
1129 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1131 }
1132 }
1133 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1134 panic!("cluster members already finalized");
1135 }
1136 }
1137 }
1138 },
1139 seen_tees,
1140 false,
1141 );
1142 }
1143
1144 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1145 self.transform_bottom_up(
1146 &mut |l| {
1147 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1148 match instantiate_fn {
1149 DebugInstantiate::Building => panic!("network not built"),
1150
1151 DebugInstantiate::Finalized(finalized) => {
1152 (finalized.connect_fn.take().unwrap())();
1153 }
1154 }
1155 }
1156 },
1157 &mut |n| {
1158 if let HydroNode::Network { instantiate_fn, .. }
1159 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1160 {
1161 match instantiate_fn {
1162 DebugInstantiate::Building => panic!("network not built"),
1163
1164 DebugInstantiate::Finalized(finalized) => {
1165 (finalized.connect_fn.take().unwrap())();
1166 }
1167 }
1168 }
1169 },
1170 seen_tees,
1171 false,
1172 );
1173 }
1174
1175 pub fn transform_bottom_up(
1176 &mut self,
1177 transform_root: &mut impl FnMut(&mut HydroRoot),
1178 transform_node: &mut impl FnMut(&mut HydroNode),
1179 seen_tees: &mut SeenSharedNodes,
1180 check_well_formed: bool,
1181 ) {
1182 self.transform_children(
1183 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1184 seen_tees,
1185 );
1186
1187 transform_root(self);
1188 }
1189
1190 pub fn transform_children(
1191 &mut self,
1192 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1193 seen_tees: &mut SeenSharedNodes,
1194 ) {
1195 match self {
1196 HydroRoot::ForEach { input, .. }
1197 | HydroRoot::SendExternal { input, .. }
1198 | HydroRoot::DestSink { input, .. }
1199 | HydroRoot::CycleSink { input, .. }
1200 | HydroRoot::EmbeddedOutput { input, .. }
1201 | HydroRoot::Null { input, .. } => {
1202 transform(input, seen_tees);
1203 }
1204 }
1205 }
1206
1207 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1208 match self {
1209 HydroRoot::ForEach {
1210 f,
1211 input,
1212 op_metadata,
1213 } => HydroRoot::ForEach {
1214 f: f.clone(),
1215 input: Box::new(input.deep_clone(seen_tees)),
1216 op_metadata: op_metadata.clone(),
1217 },
1218 HydroRoot::SendExternal {
1219 to_external_key,
1220 to_port_id,
1221 to_many,
1222 unpaired,
1223 serialize_fn,
1224 instantiate_fn,
1225 input,
1226 op_metadata,
1227 } => HydroRoot::SendExternal {
1228 to_external_key: *to_external_key,
1229 to_port_id: *to_port_id,
1230 to_many: *to_many,
1231 unpaired: *unpaired,
1232 serialize_fn: serialize_fn.clone(),
1233 instantiate_fn: instantiate_fn.clone(),
1234 input: Box::new(input.deep_clone(seen_tees)),
1235 op_metadata: op_metadata.clone(),
1236 },
1237 HydroRoot::DestSink {
1238 sink,
1239 input,
1240 op_metadata,
1241 } => HydroRoot::DestSink {
1242 sink: sink.clone(),
1243 input: Box::new(input.deep_clone(seen_tees)),
1244 op_metadata: op_metadata.clone(),
1245 },
1246 HydroRoot::CycleSink {
1247 cycle_id,
1248 input,
1249 op_metadata,
1250 } => HydroRoot::CycleSink {
1251 cycle_id: *cycle_id,
1252 input: Box::new(input.deep_clone(seen_tees)),
1253 op_metadata: op_metadata.clone(),
1254 },
1255 HydroRoot::EmbeddedOutput {
1256 ident,
1257 input,
1258 op_metadata,
1259 } => HydroRoot::EmbeddedOutput {
1260 ident: ident.clone(),
1261 input: Box::new(input.deep_clone(seen_tees)),
1262 op_metadata: op_metadata.clone(),
1263 },
1264 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1265 input: Box::new(input.deep_clone(seen_tees)),
1266 op_metadata: op_metadata.clone(),
1267 },
1268 }
1269 }
1270
1271 #[cfg(feature = "build")]
1272 pub fn emit(
1273 &mut self,
1274 graph_builders: &mut dyn DfirBuilder,
1275 seen_tees: &mut SeenSharedNodes,
1276 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1277 next_stmt_id: &mut usize,
1278 ) {
1279 self.emit_core(
1280 &mut BuildersOrCallback::<
1281 fn(&mut HydroRoot, &mut usize),
1282 fn(&mut HydroNode, &mut usize),
1283 >::Builders(graph_builders),
1284 seen_tees,
1285 built_tees,
1286 next_stmt_id,
1287 );
1288 }
1289
1290 #[cfg(feature = "build")]
1291 pub fn emit_core(
1292 &mut self,
1293 builders_or_callback: &mut BuildersOrCallback<
1294 impl FnMut(&mut HydroRoot, &mut usize),
1295 impl FnMut(&mut HydroNode, &mut usize),
1296 >,
1297 seen_tees: &mut SeenSharedNodes,
1298 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1299 next_stmt_id: &mut usize,
1300 ) {
1301 match self {
1302 HydroRoot::ForEach { f, input, .. } => {
1303 let input_ident =
1304 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1305
1306 match builders_or_callback {
1307 BuildersOrCallback::Builders(graph_builders) => {
1308 graph_builders
1309 .get_dfir_mut(&input.metadata().location_id)
1310 .add_dfir(
1311 parse_quote! {
1312 #input_ident -> for_each(#f);
1313 },
1314 None,
1315 Some(&next_stmt_id.to_string()),
1316 );
1317 }
1318 BuildersOrCallback::Callback(leaf_callback, _) => {
1319 leaf_callback(self, next_stmt_id);
1320 }
1321 }
1322
1323 *next_stmt_id += 1;
1324 }
1325
1326 HydroRoot::SendExternal {
1327 serialize_fn,
1328 instantiate_fn,
1329 input,
1330 ..
1331 } => {
1332 let input_ident =
1333 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1334
1335 match builders_or_callback {
1336 BuildersOrCallback::Builders(graph_builders) => {
1337 let (sink_expr, _) = match instantiate_fn {
1338 DebugInstantiate::Building => (
1339 syn::parse_quote!(DUMMY_SINK),
1340 syn::parse_quote!(DUMMY_SOURCE),
1341 ),
1342
1343 DebugInstantiate::Finalized(finalized) => {
1344 (finalized.sink.clone(), finalized.source.clone())
1345 }
1346 };
1347
1348 graph_builders.create_external_output(
1349 &input.metadata().location_id,
1350 sink_expr,
1351 &input_ident,
1352 serialize_fn.as_ref(),
1353 *next_stmt_id,
1354 );
1355 }
1356 BuildersOrCallback::Callback(leaf_callback, _) => {
1357 leaf_callback(self, next_stmt_id);
1358 }
1359 }
1360
1361 *next_stmt_id += 1;
1362 }
1363
1364 HydroRoot::DestSink { sink, input, .. } => {
1365 let input_ident =
1366 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1367
1368 match builders_or_callback {
1369 BuildersOrCallback::Builders(graph_builders) => {
1370 graph_builders
1371 .get_dfir_mut(&input.metadata().location_id)
1372 .add_dfir(
1373 parse_quote! {
1374 #input_ident -> dest_sink(#sink);
1375 },
1376 None,
1377 Some(&next_stmt_id.to_string()),
1378 );
1379 }
1380 BuildersOrCallback::Callback(leaf_callback, _) => {
1381 leaf_callback(self, next_stmt_id);
1382 }
1383 }
1384
1385 *next_stmt_id += 1;
1386 }
1387
1388 HydroRoot::CycleSink {
1389 cycle_id, input, ..
1390 } => {
1391 let input_ident =
1392 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1393
1394 match builders_or_callback {
1395 BuildersOrCallback::Builders(graph_builders) => {
1396 let elem_type: syn::Type = match &input.metadata().collection_kind {
1397 CollectionKind::KeyedSingleton {
1398 key_type,
1399 value_type,
1400 ..
1401 }
1402 | CollectionKind::KeyedStream {
1403 key_type,
1404 value_type,
1405 ..
1406 } => {
1407 parse_quote!((#key_type, #value_type))
1408 }
1409 CollectionKind::Stream { element_type, .. }
1410 | CollectionKind::Singleton { element_type, .. }
1411 | CollectionKind::Optional { element_type, .. } => {
1412 parse_quote!(#element_type)
1413 }
1414 };
1415
1416 let cycle_id_ident = cycle_id.as_ident();
1417 graph_builders
1418 .get_dfir_mut(&input.metadata().location_id)
1419 .add_dfir(
1420 parse_quote! {
1421 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1422 },
1423 None,
1424 None,
1425 );
1426 }
1427 BuildersOrCallback::Callback(_, _) => {}
1429 }
1430 }
1431
1432 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1433 let input_ident =
1434 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1435
1436 match builders_or_callback {
1437 BuildersOrCallback::Builders(graph_builders) => {
1438 graph_builders
1439 .get_dfir_mut(&input.metadata().location_id)
1440 .add_dfir(
1441 parse_quote! {
1442 #input_ident -> for_each(&mut #ident);
1443 },
1444 None,
1445 Some(&next_stmt_id.to_string()),
1446 );
1447 }
1448 BuildersOrCallback::Callback(leaf_callback, _) => {
1449 leaf_callback(self, next_stmt_id);
1450 }
1451 }
1452
1453 *next_stmt_id += 1;
1454 }
1455
1456 HydroRoot::Null { input, .. } => {
1457 let input_ident =
1458 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1459
1460 match builders_or_callback {
1461 BuildersOrCallback::Builders(graph_builders) => {
1462 graph_builders
1463 .get_dfir_mut(&input.metadata().location_id)
1464 .add_dfir(
1465 parse_quote! {
1466 #input_ident -> for_each(|_| {});
1467 },
1468 None,
1469 Some(&next_stmt_id.to_string()),
1470 );
1471 }
1472 BuildersOrCallback::Callback(leaf_callback, _) => {
1473 leaf_callback(self, next_stmt_id);
1474 }
1475 }
1476
1477 *next_stmt_id += 1;
1478 }
1479 }
1480 }
1481
1482 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1483 match self {
1484 HydroRoot::ForEach { op_metadata, .. }
1485 | HydroRoot::SendExternal { op_metadata, .. }
1486 | HydroRoot::DestSink { op_metadata, .. }
1487 | HydroRoot::CycleSink { op_metadata, .. }
1488 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1489 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1490 }
1491 }
1492
1493 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1494 match self {
1495 HydroRoot::ForEach { op_metadata, .. }
1496 | HydroRoot::SendExternal { op_metadata, .. }
1497 | HydroRoot::DestSink { op_metadata, .. }
1498 | HydroRoot::CycleSink { op_metadata, .. }
1499 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1500 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1501 }
1502 }
1503
1504 pub fn input(&self) -> &HydroNode {
1505 match self {
1506 HydroRoot::ForEach { input, .. }
1507 | HydroRoot::SendExternal { input, .. }
1508 | HydroRoot::DestSink { input, .. }
1509 | HydroRoot::CycleSink { input, .. }
1510 | HydroRoot::EmbeddedOutput { input, .. }
1511 | HydroRoot::Null { input, .. } => input,
1512 }
1513 }
1514
1515 pub fn input_metadata(&self) -> &HydroIrMetadata {
1516 self.input().metadata()
1517 }
1518
1519 pub fn print_root(&self) -> String {
1520 match self {
1521 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1522 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1523 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1524 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1525 HydroRoot::EmbeddedOutput { ident, .. } => {
1526 format!("EmbeddedOutput({})", ident)
1527 }
1528 HydroRoot::Null { .. } => "Null".to_owned(),
1529 }
1530 }
1531
1532 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1533 match self {
1534 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1535 transform(f);
1536 }
1537 HydroRoot::SendExternal { .. }
1538 | HydroRoot::CycleSink { .. }
1539 | HydroRoot::EmbeddedOutput { .. }
1540 | HydroRoot::Null { .. } => {}
1541 }
1542 }
1543}
1544
1545#[cfg(feature = "build")]
1546fn tick_of(loc: &LocationId) -> Option<ClockId> {
1547 match loc {
1548 LocationId::Tick(id, _) => Some(*id),
1549 LocationId::Atomic(inner) => tick_of(inner),
1550 _ => None,
1551 }
1552}
1553
1554#[cfg(feature = "build")]
1555fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1556 match loc {
1557 LocationId::Tick(id, inner) => {
1558 *id = uf_find(uf, *id);
1559 remap_location(inner, uf);
1560 }
1561 LocationId::Atomic(inner) => {
1562 remap_location(inner, uf);
1563 }
1564 LocationId::Process(_) | LocationId::Cluster(_) => {}
1565 }
1566}
1567
1568#[cfg(feature = "build")]
1569fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1570 let p = *parent.get(&x).unwrap_or(&x);
1571 if p == x {
1572 return x;
1573 }
1574 let root = uf_find(parent, p);
1575 parent.insert(x, root);
1576 root
1577}
1578
1579#[cfg(feature = "build")]
1580fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1581 let ra = uf_find(parent, a);
1582 let rb = uf_find(parent, b);
1583 if ra != rb {
1584 parent.insert(ra, rb);
1585 }
1586}
1587
1588#[cfg(feature = "build")]
1592pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1593 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1594
1595 transform_bottom_up(
1597 ir,
1598 &mut |_| {},
1599 &mut |node: &mut HydroNode| {
1600 if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1601 node
1602 && let (Some(a), Some(b)) = (
1603 tick_of(&inner.metadata().location_id),
1604 tick_of(&metadata.location_id),
1605 )
1606 {
1607 uf_union(&mut uf, a, b);
1608 }
1609 },
1610 false,
1611 );
1612
1613 transform_bottom_up(
1615 ir,
1616 &mut |_| {},
1617 &mut |node: &mut HydroNode| {
1618 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1619 },
1620 false,
1621 );
1622}
1623
1624#[cfg(feature = "build")]
1625pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1626 let mut builders = SecondaryMap::new();
1627 let mut seen_tees = HashMap::new();
1628 let mut built_tees = HashMap::new();
1629 let mut next_stmt_id = 0;
1630 for leaf in ir {
1631 leaf.emit(
1632 &mut builders,
1633 &mut seen_tees,
1634 &mut built_tees,
1635 &mut next_stmt_id,
1636 );
1637 }
1638 builders
1639}
1640
1641#[cfg(feature = "build")]
1642pub fn traverse_dfir(
1643 ir: &mut [HydroRoot],
1644 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1645 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1646) {
1647 let mut seen_tees = HashMap::new();
1648 let mut built_tees = HashMap::new();
1649 let mut next_stmt_id = 0;
1650 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1651 ir.iter_mut().for_each(|leaf| {
1652 leaf.emit_core(
1653 &mut callback,
1654 &mut seen_tees,
1655 &mut built_tees,
1656 &mut next_stmt_id,
1657 );
1658 });
1659}
1660
1661pub fn transform_bottom_up(
1662 ir: &mut [HydroRoot],
1663 transform_root: &mut impl FnMut(&mut HydroRoot),
1664 transform_node: &mut impl FnMut(&mut HydroNode),
1665 check_well_formed: bool,
1666) {
1667 let mut seen_tees = HashMap::new();
1668 ir.iter_mut().for_each(|leaf| {
1669 leaf.transform_bottom_up(
1670 transform_root,
1671 transform_node,
1672 &mut seen_tees,
1673 check_well_formed,
1674 );
1675 });
1676}
1677
1678pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1679 let mut seen_tees = HashMap::new();
1680 ir.iter()
1681 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1682 .collect()
1683}
1684
1685type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1686thread_local! {
1687 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1688 static SERIALIZED_SHARED: PrintedTees
1692 = const { RefCell::new(None) };
1693}
1694
1695pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1696 PRINTED_TEES.with(|printed_tees| {
1697 let mut printed_tees_mut = printed_tees.borrow_mut();
1698 *printed_tees_mut = Some((0, HashMap::new()));
1699 drop(printed_tees_mut);
1700
1701 let ret = f();
1702
1703 let mut printed_tees_mut = printed_tees.borrow_mut();
1704 *printed_tees_mut = None;
1705
1706 ret
1707 })
1708}
1709
1710pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1715 let _guard = SerializedSharedGuard::enter();
1716 f()
1717}
1718
1719struct SerializedSharedGuard {
1722 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1723}
1724
1725impl SerializedSharedGuard {
1726 fn enter() -> Self {
1727 let previous = SERIALIZED_SHARED.with(|cell| {
1728 let mut guard = cell.borrow_mut();
1729 guard.replace((0, HashMap::new()))
1730 });
1731 Self { previous }
1732 }
1733}
1734
1735impl Drop for SerializedSharedGuard {
1736 fn drop(&mut self) {
1737 SERIALIZED_SHARED.with(|cell| {
1738 *cell.borrow_mut() = self.previous.take();
1739 });
1740 }
1741}
1742
1743pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1744
1745impl serde::Serialize for SharedNode {
1746 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1757 SERIALIZED_SHARED.with(|cell| {
1758 let mut guard = cell.borrow_mut();
1759 let state = guard.as_mut().ok_or_else(|| {
1761 serde::ser::Error::custom(
1762 "SharedNode serialization requires an active serialize_dedup_shared scope",
1763 )
1764 })?;
1765 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
1766
1767 if let Some(&id) = state.1.get(&ptr) {
1768 drop(guard);
1769 use serde::ser::SerializeMap;
1770 let mut map = serializer.serialize_map(Some(1))?;
1771 map.serialize_entry("$shared_ref", &id)?;
1772 map.end()
1773 } else {
1774 let id = state.0;
1775 state.0 += 1;
1776 state.1.insert(ptr, id);
1777 drop(guard);
1778
1779 use serde::ser::SerializeMap;
1780 let mut map = serializer.serialize_map(Some(2))?;
1781 map.serialize_entry("$shared", &id)?;
1782 map.serialize_entry("node", &*self.0.borrow())?;
1783 map.end()
1784 }
1785 })
1786 }
1787}
1788
1789impl SharedNode {
1790 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1791 Rc::as_ptr(&self.0)
1792 }
1793}
1794
1795impl Debug for SharedNode {
1796 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1797 PRINTED_TEES.with(|printed_tees| {
1798 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1799 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1800
1801 if let Some(printed_tees_mut) = printed_tees_mut {
1802 if let Some(existing) = printed_tees_mut
1803 .1
1804 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1805 {
1806 write!(f, "<shared {}>", existing)
1807 } else {
1808 let next_id = printed_tees_mut.0;
1809 printed_tees_mut.0 += 1;
1810 printed_tees_mut
1811 .1
1812 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1813 drop(printed_tees_mut_borrow);
1814 write!(f, "<shared {}>: ", next_id)?;
1815 Debug::fmt(&self.0.borrow(), f)
1816 }
1817 } else {
1818 drop(printed_tees_mut_borrow);
1819 write!(f, "<shared>: ")?;
1820 Debug::fmt(&self.0.borrow(), f)
1821 }
1822 })
1823 }
1824}
1825
1826impl Hash for SharedNode {
1827 fn hash<H: Hasher>(&self, state: &mut H) {
1828 self.0.borrow_mut().hash(state);
1829 }
1830}
1831
1832#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1833pub enum BoundKind {
1834 Unbounded,
1835 Bounded,
1836}
1837
1838#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1839pub enum StreamOrder {
1840 NoOrder,
1841 TotalOrder,
1842}
1843
1844#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1845pub enum StreamRetry {
1846 AtLeastOnce,
1847 ExactlyOnce,
1848}
1849
1850#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1851pub enum KeyedSingletonBoundKind {
1852 Unbounded,
1853 MonotonicValue,
1854 BoundedValue,
1855 Bounded,
1856}
1857
1858#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1859pub enum SingletonBoundKind {
1860 Unbounded,
1861 Monotonic,
1862 Bounded,
1863}
1864
1865#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
1866pub enum CollectionKind {
1867 Stream {
1868 bound: BoundKind,
1869 order: StreamOrder,
1870 retry: StreamRetry,
1871 element_type: DebugType,
1872 },
1873 Singleton {
1874 bound: SingletonBoundKind,
1875 element_type: DebugType,
1876 },
1877 Optional {
1878 bound: BoundKind,
1879 element_type: DebugType,
1880 },
1881 KeyedStream {
1882 bound: BoundKind,
1883 value_order: StreamOrder,
1884 value_retry: StreamRetry,
1885 key_type: DebugType,
1886 value_type: DebugType,
1887 },
1888 KeyedSingleton {
1889 bound: KeyedSingletonBoundKind,
1890 key_type: DebugType,
1891 value_type: DebugType,
1892 },
1893}
1894
1895impl CollectionKind {
1896 pub fn is_bounded(&self) -> bool {
1897 matches!(
1898 self,
1899 CollectionKind::Stream {
1900 bound: BoundKind::Bounded,
1901 ..
1902 } | CollectionKind::Singleton {
1903 bound: SingletonBoundKind::Bounded,
1904 ..
1905 } | CollectionKind::Optional {
1906 bound: BoundKind::Bounded,
1907 ..
1908 } | CollectionKind::KeyedStream {
1909 bound: BoundKind::Bounded,
1910 ..
1911 } | CollectionKind::KeyedSingleton {
1912 bound: KeyedSingletonBoundKind::Bounded,
1913 ..
1914 }
1915 )
1916 }
1917}
1918
1919#[derive(Clone, serde::Serialize)]
1920pub struct HydroIrMetadata {
1921 pub location_id: LocationId,
1922 pub collection_kind: CollectionKind,
1923 pub cardinality: Option<usize>,
1924 pub tag: Option<String>,
1925 pub op: HydroIrOpMetadata,
1926}
1927
1928impl Hash for HydroIrMetadata {
1930 fn hash<H: Hasher>(&self, _: &mut H) {}
1931}
1932
1933impl PartialEq for HydroIrMetadata {
1934 fn eq(&self, _: &Self) -> bool {
1935 true
1936 }
1937}
1938
1939impl Eq for HydroIrMetadata {}
1940
1941impl Debug for HydroIrMetadata {
1942 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1943 f.debug_struct("HydroIrMetadata")
1944 .field("location_id", &self.location_id)
1945 .field("collection_kind", &self.collection_kind)
1946 .finish()
1947 }
1948}
1949
1950#[derive(Clone, serde::Serialize)]
1953pub struct HydroIrOpMetadata {
1954 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
1955 pub backtrace: Backtrace,
1956 pub cpu_usage: Option<f64>,
1957 pub network_recv_cpu_usage: Option<f64>,
1958 pub id: Option<usize>,
1959}
1960
1961impl HydroIrOpMetadata {
1962 #[expect(
1963 clippy::new_without_default,
1964 reason = "explicit calls to new ensure correct backtrace bounds"
1965 )]
1966 pub fn new() -> HydroIrOpMetadata {
1967 Self::new_with_skip(1)
1968 }
1969
1970 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1971 HydroIrOpMetadata {
1972 backtrace: Backtrace::get_backtrace(2 + skip_count),
1973 cpu_usage: None,
1974 network_recv_cpu_usage: None,
1975 id: None,
1976 }
1977 }
1978}
1979
1980impl Debug for HydroIrOpMetadata {
1981 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1982 f.debug_struct("HydroIrOpMetadata").finish()
1983 }
1984}
1985
1986impl Hash for HydroIrOpMetadata {
1987 fn hash<H: Hasher>(&self, _: &mut H) {}
1988}
1989
1990#[derive(Debug, Hash, serde::Serialize)]
1993pub enum HydroNode {
1994 Placeholder,
1995
1996 Cast {
2004 inner: Box<HydroNode>,
2005 metadata: HydroIrMetadata,
2006 },
2007
2008 ObserveNonDet {
2014 inner: Box<HydroNode>,
2015 trusted: bool, metadata: HydroIrMetadata,
2017 },
2018
2019 Source {
2020 source: HydroSource,
2021 metadata: HydroIrMetadata,
2022 },
2023
2024 SingletonSource {
2025 value: DebugExpr,
2026 first_tick_only: bool,
2027 metadata: HydroIrMetadata,
2028 },
2029
2030 CycleSource {
2031 cycle_id: CycleId,
2032 metadata: HydroIrMetadata,
2033 },
2034
2035 Tee {
2036 inner: SharedNode,
2037 metadata: HydroIrMetadata,
2038 },
2039
2040 Partition {
2041 inner: SharedNode,
2042 f: DebugExpr,
2043 is_true: bool,
2044 metadata: HydroIrMetadata,
2045 },
2046
2047 BeginAtomic {
2048 inner: Box<HydroNode>,
2049 metadata: HydroIrMetadata,
2050 },
2051
2052 EndAtomic {
2053 inner: Box<HydroNode>,
2054 metadata: HydroIrMetadata,
2055 },
2056
2057 Batch {
2058 inner: Box<HydroNode>,
2059 metadata: HydroIrMetadata,
2060 },
2061
2062 YieldConcat {
2063 inner: Box<HydroNode>,
2064 metadata: HydroIrMetadata,
2065 },
2066
2067 Chain {
2068 first: Box<HydroNode>,
2069 second: Box<HydroNode>,
2070 metadata: HydroIrMetadata,
2071 },
2072
2073 MergeOrdered {
2074 first: Box<HydroNode>,
2075 second: Box<HydroNode>,
2076 metadata: HydroIrMetadata,
2077 },
2078
2079 ChainFirst {
2080 first: Box<HydroNode>,
2081 second: Box<HydroNode>,
2082 metadata: HydroIrMetadata,
2083 },
2084
2085 CrossProduct {
2086 left: Box<HydroNode>,
2087 right: Box<HydroNode>,
2088 metadata: HydroIrMetadata,
2089 },
2090
2091 CrossSingleton {
2092 left: Box<HydroNode>,
2093 right: Box<HydroNode>,
2094 metadata: HydroIrMetadata,
2095 },
2096
2097 Join {
2098 left: Box<HydroNode>,
2099 right: Box<HydroNode>,
2100 metadata: HydroIrMetadata,
2101 },
2102
2103 JoinHalf {
2107 left: Box<HydroNode>,
2108 right: Box<HydroNode>,
2109 metadata: HydroIrMetadata,
2110 },
2111
2112 Difference {
2113 pos: Box<HydroNode>,
2114 neg: Box<HydroNode>,
2115 metadata: HydroIrMetadata,
2116 },
2117
2118 AntiJoin {
2119 pos: Box<HydroNode>,
2120 neg: Box<HydroNode>,
2121 metadata: HydroIrMetadata,
2122 },
2123
2124 ResolveFutures {
2125 input: Box<HydroNode>,
2126 metadata: HydroIrMetadata,
2127 },
2128 ResolveFuturesBlocking {
2129 input: Box<HydroNode>,
2130 metadata: HydroIrMetadata,
2131 },
2132 ResolveFuturesOrdered {
2133 input: Box<HydroNode>,
2134 metadata: HydroIrMetadata,
2135 },
2136
2137 Map {
2138 f: DebugExpr,
2139 input: Box<HydroNode>,
2140 metadata: HydroIrMetadata,
2141 },
2142 FlatMap {
2143 f: DebugExpr,
2144 input: Box<HydroNode>,
2145 metadata: HydroIrMetadata,
2146 },
2147 FlatMapStreamBlocking {
2148 f: DebugExpr,
2149 input: Box<HydroNode>,
2150 metadata: HydroIrMetadata,
2151 },
2152 Filter {
2153 f: DebugExpr,
2154 input: Box<HydroNode>,
2155 metadata: HydroIrMetadata,
2156 },
2157 FilterMap {
2158 f: DebugExpr,
2159 input: Box<HydroNode>,
2160 metadata: HydroIrMetadata,
2161 },
2162
2163 DeferTick {
2164 input: Box<HydroNode>,
2165 metadata: HydroIrMetadata,
2166 },
2167 Enumerate {
2168 input: Box<HydroNode>,
2169 metadata: HydroIrMetadata,
2170 },
2171 Inspect {
2172 f: DebugExpr,
2173 input: Box<HydroNode>,
2174 metadata: HydroIrMetadata,
2175 },
2176
2177 Unique {
2178 input: Box<HydroNode>,
2179 metadata: HydroIrMetadata,
2180 },
2181
2182 Sort {
2183 input: Box<HydroNode>,
2184 metadata: HydroIrMetadata,
2185 },
2186 Fold {
2187 init: DebugExpr,
2188 acc: DebugExpr,
2189 input: Box<HydroNode>,
2190 metadata: HydroIrMetadata,
2191 },
2192
2193 Scan {
2194 init: DebugExpr,
2195 acc: DebugExpr,
2196 input: Box<HydroNode>,
2197 metadata: HydroIrMetadata,
2198 },
2199 ScanAsyncBlocking {
2200 init: DebugExpr,
2201 acc: DebugExpr,
2202 input: Box<HydroNode>,
2203 metadata: HydroIrMetadata,
2204 },
2205 FoldKeyed {
2206 init: DebugExpr,
2207 acc: DebugExpr,
2208 input: Box<HydroNode>,
2209 metadata: HydroIrMetadata,
2210 },
2211
2212 Reduce {
2213 f: DebugExpr,
2214 input: Box<HydroNode>,
2215 metadata: HydroIrMetadata,
2216 },
2217 ReduceKeyed {
2218 f: DebugExpr,
2219 input: Box<HydroNode>,
2220 metadata: HydroIrMetadata,
2221 },
2222 ReduceKeyedWatermark {
2223 f: DebugExpr,
2224 input: Box<HydroNode>,
2225 watermark: Box<HydroNode>,
2226 metadata: HydroIrMetadata,
2227 },
2228
2229 Network {
2230 name: Option<String>,
2231 networking_info: crate::networking::NetworkingInfo,
2232 serialize_fn: Option<DebugExpr>,
2233 instantiate_fn: DebugInstantiate,
2234 deserialize_fn: Option<DebugExpr>,
2235 input: Box<HydroNode>,
2236 metadata: HydroIrMetadata,
2237 },
2238
2239 ExternalInput {
2240 from_external_key: LocationKey,
2241 from_port_id: ExternalPortId,
2242 from_many: bool,
2243 codec_type: DebugType,
2244 #[serde(skip)]
2245 port_hint: NetworkHint,
2246 instantiate_fn: DebugInstantiate,
2247 deserialize_fn: Option<DebugExpr>,
2248 metadata: HydroIrMetadata,
2249 },
2250
2251 Counter {
2252 tag: String,
2253 duration: DebugExpr,
2254 prefix: String,
2255 input: Box<HydroNode>,
2256 metadata: HydroIrMetadata,
2257 },
2258}
2259
2260pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2261pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2262
2263impl HydroNode {
2264 pub fn transform_bottom_up(
2265 &mut self,
2266 transform: &mut impl FnMut(&mut HydroNode),
2267 seen_tees: &mut SeenSharedNodes,
2268 check_well_formed: bool,
2269 ) {
2270 self.transform_children(
2271 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2272 seen_tees,
2273 );
2274
2275 transform(self);
2276
2277 let self_location = self.metadata().location_id.root();
2278
2279 if check_well_formed {
2280 match &*self {
2281 HydroNode::Network { .. } => {}
2282 _ => {
2283 self.input_metadata().iter().for_each(|i| {
2284 if i.location_id.root() != self_location {
2285 panic!(
2286 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2287 i,
2288 i.location_id.root(),
2289 self,
2290 self_location
2291 )
2292 }
2293 });
2294 }
2295 }
2296 }
2297 }
2298
2299 #[inline(always)]
2300 pub fn transform_children(
2301 &mut self,
2302 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2303 seen_tees: &mut SeenSharedNodes,
2304 ) {
2305 match self {
2306 HydroNode::Placeholder => {
2307 panic!();
2308 }
2309
2310 HydroNode::Source { .. }
2311 | HydroNode::SingletonSource { .. }
2312 | HydroNode::CycleSource { .. }
2313 | HydroNode::ExternalInput { .. } => {}
2314
2315 HydroNode::Tee { inner, .. } => {
2316 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2317 *inner = SharedNode(transformed.clone());
2318 } else {
2319 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2320 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2321 let mut orig = inner.0.replace(HydroNode::Placeholder);
2322 transform(&mut orig, seen_tees);
2323 *transformed_cell.borrow_mut() = orig;
2324 *inner = SharedNode(transformed_cell);
2325 }
2326 }
2327
2328 HydroNode::Partition { inner, .. } => {
2329 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2330 *inner = SharedNode(transformed.clone());
2331 } else {
2332 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2333 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2334 let mut orig = inner.0.replace(HydroNode::Placeholder);
2335 transform(&mut orig, seen_tees);
2336 *transformed_cell.borrow_mut() = orig;
2337 *inner = SharedNode(transformed_cell);
2338 }
2339 }
2340
2341 HydroNode::Cast { inner, .. }
2342 | HydroNode::ObserveNonDet { inner, .. }
2343 | HydroNode::BeginAtomic { inner, .. }
2344 | HydroNode::EndAtomic { inner, .. }
2345 | HydroNode::Batch { inner, .. }
2346 | HydroNode::YieldConcat { inner, .. } => {
2347 transform(inner.as_mut(), seen_tees);
2348 }
2349
2350 HydroNode::Chain { first, second, .. } => {
2351 transform(first.as_mut(), seen_tees);
2352 transform(second.as_mut(), seen_tees);
2353 }
2354
2355 HydroNode::MergeOrdered { first, second, .. } => {
2356 transform(first.as_mut(), seen_tees);
2357 transform(second.as_mut(), seen_tees);
2358 }
2359
2360 HydroNode::ChainFirst { first, second, .. } => {
2361 transform(first.as_mut(), seen_tees);
2362 transform(second.as_mut(), seen_tees);
2363 }
2364
2365 HydroNode::CrossSingleton { left, right, .. }
2366 | HydroNode::CrossProduct { left, right, .. }
2367 | HydroNode::Join { left, right, .. }
2368 | HydroNode::JoinHalf { left, right, .. } => {
2369 transform(left.as_mut(), seen_tees);
2370 transform(right.as_mut(), seen_tees);
2371 }
2372
2373 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2374 transform(pos.as_mut(), seen_tees);
2375 transform(neg.as_mut(), seen_tees);
2376 }
2377
2378 HydroNode::ReduceKeyedWatermark {
2379 input, watermark, ..
2380 } => {
2381 transform(input.as_mut(), seen_tees);
2382 transform(watermark.as_mut(), seen_tees);
2383 }
2384
2385 HydroNode::Map { input, .. }
2386 | HydroNode::ResolveFutures { input, .. }
2387 | HydroNode::ResolveFuturesBlocking { input, .. }
2388 | HydroNode::ResolveFuturesOrdered { input, .. }
2389 | HydroNode::FlatMap { input, .. }
2390 | HydroNode::FlatMapStreamBlocking { input, .. }
2391 | HydroNode::Filter { input, .. }
2392 | HydroNode::FilterMap { input, .. }
2393 | HydroNode::Sort { input, .. }
2394 | HydroNode::DeferTick { input, .. }
2395 | HydroNode::Enumerate { input, .. }
2396 | HydroNode::Inspect { input, .. }
2397 | HydroNode::Unique { input, .. }
2398 | HydroNode::Network { input, .. }
2399 | HydroNode::Fold { input, .. }
2400 | HydroNode::Scan { input, .. }
2401 | HydroNode::ScanAsyncBlocking { input, .. }
2402 | HydroNode::FoldKeyed { input, .. }
2403 | HydroNode::Reduce { input, .. }
2404 | HydroNode::ReduceKeyed { input, .. }
2405 | HydroNode::Counter { input, .. } => {
2406 transform(input.as_mut(), seen_tees);
2407 }
2408 }
2409 }
2410
2411 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2412 match self {
2413 HydroNode::Placeholder => HydroNode::Placeholder,
2414 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2415 inner: Box::new(inner.deep_clone(seen_tees)),
2416 metadata: metadata.clone(),
2417 },
2418 HydroNode::ObserveNonDet {
2419 inner,
2420 trusted,
2421 metadata,
2422 } => HydroNode::ObserveNonDet {
2423 inner: Box::new(inner.deep_clone(seen_tees)),
2424 trusted: *trusted,
2425 metadata: metadata.clone(),
2426 },
2427 HydroNode::Source { source, metadata } => HydroNode::Source {
2428 source: source.clone(),
2429 metadata: metadata.clone(),
2430 },
2431 HydroNode::SingletonSource {
2432 value,
2433 first_tick_only,
2434 metadata,
2435 } => HydroNode::SingletonSource {
2436 value: value.clone(),
2437 first_tick_only: *first_tick_only,
2438 metadata: metadata.clone(),
2439 },
2440 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2441 cycle_id: *cycle_id,
2442 metadata: metadata.clone(),
2443 },
2444 HydroNode::Tee { inner, metadata } => {
2445 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2446 HydroNode::Tee {
2447 inner: SharedNode(transformed.clone()),
2448 metadata: metadata.clone(),
2449 }
2450 } else {
2451 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2452 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2453 let cloned = inner.0.borrow().deep_clone(seen_tees);
2454 *new_rc.borrow_mut() = cloned;
2455 HydroNode::Tee {
2456 inner: SharedNode(new_rc),
2457 metadata: metadata.clone(),
2458 }
2459 }
2460 }
2461 HydroNode::Partition {
2462 inner,
2463 f,
2464 is_true,
2465 metadata,
2466 } => {
2467 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2468 HydroNode::Partition {
2469 inner: SharedNode(transformed.clone()),
2470 f: f.clone(),
2471 is_true: *is_true,
2472 metadata: metadata.clone(),
2473 }
2474 } else {
2475 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2476 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2477 let cloned = inner.0.borrow().deep_clone(seen_tees);
2478 *new_rc.borrow_mut() = cloned;
2479 HydroNode::Partition {
2480 inner: SharedNode(new_rc),
2481 f: f.clone(),
2482 is_true: *is_true,
2483 metadata: metadata.clone(),
2484 }
2485 }
2486 }
2487 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2488 inner: Box::new(inner.deep_clone(seen_tees)),
2489 metadata: metadata.clone(),
2490 },
2491 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2492 inner: Box::new(inner.deep_clone(seen_tees)),
2493 metadata: metadata.clone(),
2494 },
2495 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2496 inner: Box::new(inner.deep_clone(seen_tees)),
2497 metadata: metadata.clone(),
2498 },
2499 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2500 inner: Box::new(inner.deep_clone(seen_tees)),
2501 metadata: metadata.clone(),
2502 },
2503 HydroNode::Chain {
2504 first,
2505 second,
2506 metadata,
2507 } => HydroNode::Chain {
2508 first: Box::new(first.deep_clone(seen_tees)),
2509 second: Box::new(second.deep_clone(seen_tees)),
2510 metadata: metadata.clone(),
2511 },
2512 HydroNode::MergeOrdered {
2513 first,
2514 second,
2515 metadata,
2516 } => HydroNode::MergeOrdered {
2517 first: Box::new(first.deep_clone(seen_tees)),
2518 second: Box::new(second.deep_clone(seen_tees)),
2519 metadata: metadata.clone(),
2520 },
2521 HydroNode::ChainFirst {
2522 first,
2523 second,
2524 metadata,
2525 } => HydroNode::ChainFirst {
2526 first: Box::new(first.deep_clone(seen_tees)),
2527 second: Box::new(second.deep_clone(seen_tees)),
2528 metadata: metadata.clone(),
2529 },
2530 HydroNode::CrossProduct {
2531 left,
2532 right,
2533 metadata,
2534 } => HydroNode::CrossProduct {
2535 left: Box::new(left.deep_clone(seen_tees)),
2536 right: Box::new(right.deep_clone(seen_tees)),
2537 metadata: metadata.clone(),
2538 },
2539 HydroNode::CrossSingleton {
2540 left,
2541 right,
2542 metadata,
2543 } => HydroNode::CrossSingleton {
2544 left: Box::new(left.deep_clone(seen_tees)),
2545 right: Box::new(right.deep_clone(seen_tees)),
2546 metadata: metadata.clone(),
2547 },
2548 HydroNode::Join {
2549 left,
2550 right,
2551 metadata,
2552 } => HydroNode::Join {
2553 left: Box::new(left.deep_clone(seen_tees)),
2554 right: Box::new(right.deep_clone(seen_tees)),
2555 metadata: metadata.clone(),
2556 },
2557 HydroNode::JoinHalf {
2558 left,
2559 right,
2560 metadata,
2561 } => HydroNode::JoinHalf {
2562 left: Box::new(left.deep_clone(seen_tees)),
2563 right: Box::new(right.deep_clone(seen_tees)),
2564 metadata: metadata.clone(),
2565 },
2566 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2567 pos: Box::new(pos.deep_clone(seen_tees)),
2568 neg: Box::new(neg.deep_clone(seen_tees)),
2569 metadata: metadata.clone(),
2570 },
2571 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2572 pos: Box::new(pos.deep_clone(seen_tees)),
2573 neg: Box::new(neg.deep_clone(seen_tees)),
2574 metadata: metadata.clone(),
2575 },
2576 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2577 input: Box::new(input.deep_clone(seen_tees)),
2578 metadata: metadata.clone(),
2579 },
2580 HydroNode::ResolveFuturesBlocking { input, metadata } => {
2581 HydroNode::ResolveFuturesBlocking {
2582 input: Box::new(input.deep_clone(seen_tees)),
2583 metadata: metadata.clone(),
2584 }
2585 }
2586 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2587 HydroNode::ResolveFuturesOrdered {
2588 input: Box::new(input.deep_clone(seen_tees)),
2589 metadata: metadata.clone(),
2590 }
2591 }
2592 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2593 f: f.clone(),
2594 input: Box::new(input.deep_clone(seen_tees)),
2595 metadata: metadata.clone(),
2596 },
2597 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2598 f: f.clone(),
2599 input: Box::new(input.deep_clone(seen_tees)),
2600 metadata: metadata.clone(),
2601 },
2602 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2603 HydroNode::FlatMapStreamBlocking {
2604 f: f.clone(),
2605 input: Box::new(input.deep_clone(seen_tees)),
2606 metadata: metadata.clone(),
2607 }
2608 }
2609 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2610 f: f.clone(),
2611 input: Box::new(input.deep_clone(seen_tees)),
2612 metadata: metadata.clone(),
2613 },
2614 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2615 f: f.clone(),
2616 input: Box::new(input.deep_clone(seen_tees)),
2617 metadata: metadata.clone(),
2618 },
2619 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2620 input: Box::new(input.deep_clone(seen_tees)),
2621 metadata: metadata.clone(),
2622 },
2623 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2624 input: Box::new(input.deep_clone(seen_tees)),
2625 metadata: metadata.clone(),
2626 },
2627 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2628 f: f.clone(),
2629 input: Box::new(input.deep_clone(seen_tees)),
2630 metadata: metadata.clone(),
2631 },
2632 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2633 input: Box::new(input.deep_clone(seen_tees)),
2634 metadata: metadata.clone(),
2635 },
2636 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2637 input: Box::new(input.deep_clone(seen_tees)),
2638 metadata: metadata.clone(),
2639 },
2640 HydroNode::Fold {
2641 init,
2642 acc,
2643 input,
2644 metadata,
2645 } => HydroNode::Fold {
2646 init: init.clone(),
2647 acc: acc.clone(),
2648 input: Box::new(input.deep_clone(seen_tees)),
2649 metadata: metadata.clone(),
2650 },
2651 HydroNode::Scan {
2652 init,
2653 acc,
2654 input,
2655 metadata,
2656 } => HydroNode::Scan {
2657 init: init.clone(),
2658 acc: acc.clone(),
2659 input: Box::new(input.deep_clone(seen_tees)),
2660 metadata: metadata.clone(),
2661 },
2662 HydroNode::ScanAsyncBlocking {
2663 init,
2664 acc,
2665 input,
2666 metadata,
2667 } => HydroNode::ScanAsyncBlocking {
2668 init: init.clone(),
2669 acc: acc.clone(),
2670 input: Box::new(input.deep_clone(seen_tees)),
2671 metadata: metadata.clone(),
2672 },
2673 HydroNode::FoldKeyed {
2674 init,
2675 acc,
2676 input,
2677 metadata,
2678 } => HydroNode::FoldKeyed {
2679 init: init.clone(),
2680 acc: acc.clone(),
2681 input: Box::new(input.deep_clone(seen_tees)),
2682 metadata: metadata.clone(),
2683 },
2684 HydroNode::ReduceKeyedWatermark {
2685 f,
2686 input,
2687 watermark,
2688 metadata,
2689 } => HydroNode::ReduceKeyedWatermark {
2690 f: f.clone(),
2691 input: Box::new(input.deep_clone(seen_tees)),
2692 watermark: Box::new(watermark.deep_clone(seen_tees)),
2693 metadata: metadata.clone(),
2694 },
2695 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2696 f: f.clone(),
2697 input: Box::new(input.deep_clone(seen_tees)),
2698 metadata: metadata.clone(),
2699 },
2700 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2701 f: f.clone(),
2702 input: Box::new(input.deep_clone(seen_tees)),
2703 metadata: metadata.clone(),
2704 },
2705 HydroNode::Network {
2706 name,
2707 networking_info,
2708 serialize_fn,
2709 instantiate_fn,
2710 deserialize_fn,
2711 input,
2712 metadata,
2713 } => HydroNode::Network {
2714 name: name.clone(),
2715 networking_info: networking_info.clone(),
2716 serialize_fn: serialize_fn.clone(),
2717 instantiate_fn: instantiate_fn.clone(),
2718 deserialize_fn: deserialize_fn.clone(),
2719 input: Box::new(input.deep_clone(seen_tees)),
2720 metadata: metadata.clone(),
2721 },
2722 HydroNode::ExternalInput {
2723 from_external_key,
2724 from_port_id,
2725 from_many,
2726 codec_type,
2727 port_hint,
2728 instantiate_fn,
2729 deserialize_fn,
2730 metadata,
2731 } => HydroNode::ExternalInput {
2732 from_external_key: *from_external_key,
2733 from_port_id: *from_port_id,
2734 from_many: *from_many,
2735 codec_type: codec_type.clone(),
2736 port_hint: *port_hint,
2737 instantiate_fn: instantiate_fn.clone(),
2738 deserialize_fn: deserialize_fn.clone(),
2739 metadata: metadata.clone(),
2740 },
2741 HydroNode::Counter {
2742 tag,
2743 duration,
2744 prefix,
2745 input,
2746 metadata,
2747 } => HydroNode::Counter {
2748 tag: tag.clone(),
2749 duration: duration.clone(),
2750 prefix: prefix.clone(),
2751 input: Box::new(input.deep_clone(seen_tees)),
2752 metadata: metadata.clone(),
2753 },
2754 }
2755 }
2756
2757 #[cfg(feature = "build")]
2758 pub fn emit_core(
2759 &mut self,
2760 builders_or_callback: &mut BuildersOrCallback<
2761 impl FnMut(&mut HydroRoot, &mut usize),
2762 impl FnMut(&mut HydroNode, &mut usize),
2763 >,
2764 seen_tees: &mut SeenSharedNodes,
2765 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2766 next_stmt_id: &mut usize,
2767 ) -> syn::Ident {
2768 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2769
2770 self.transform_bottom_up(
2771 &mut |node: &mut HydroNode| {
2772 let out_location = node.metadata().location_id.clone();
2773 match node {
2774 HydroNode::Placeholder => {
2775 panic!()
2776 }
2777
2778 HydroNode::Cast { .. } => {
2779 match builders_or_callback {
2782 BuildersOrCallback::Builders(_) => {}
2783 BuildersOrCallback::Callback(_, node_callback) => {
2784 node_callback(node, next_stmt_id);
2785 }
2786 }
2787
2788 *next_stmt_id += 1;
2789 }
2791
2792 HydroNode::ObserveNonDet {
2793 inner,
2794 trusted,
2795 metadata,
2796 ..
2797 } => {
2798 let inner_ident = ident_stack.pop().unwrap();
2799
2800 let observe_ident =
2801 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2802
2803 match builders_or_callback {
2804 BuildersOrCallback::Builders(graph_builders) => {
2805 graph_builders.observe_nondet(
2806 *trusted,
2807 &inner.metadata().location_id,
2808 inner_ident,
2809 &inner.metadata().collection_kind,
2810 &observe_ident,
2811 &metadata.collection_kind,
2812 &metadata.op,
2813 );
2814 }
2815 BuildersOrCallback::Callback(_, node_callback) => {
2816 node_callback(node, next_stmt_id);
2817 }
2818 }
2819
2820 *next_stmt_id += 1;
2821
2822 ident_stack.push(observe_ident);
2823 }
2824
2825 HydroNode::Batch {
2826 inner, metadata, ..
2827 } => {
2828 let inner_ident = ident_stack.pop().unwrap();
2829
2830 let batch_ident =
2831 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2832
2833 match builders_or_callback {
2834 BuildersOrCallback::Builders(graph_builders) => {
2835 graph_builders.batch(
2836 inner_ident,
2837 &inner.metadata().location_id,
2838 &inner.metadata().collection_kind,
2839 &batch_ident,
2840 &out_location,
2841 &metadata.op,
2842 );
2843 }
2844 BuildersOrCallback::Callback(_, node_callback) => {
2845 node_callback(node, next_stmt_id);
2846 }
2847 }
2848
2849 *next_stmt_id += 1;
2850
2851 ident_stack.push(batch_ident);
2852 }
2853
2854 HydroNode::YieldConcat { inner, .. } => {
2855 let inner_ident = ident_stack.pop().unwrap();
2856
2857 let yield_ident =
2858 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2859
2860 match builders_or_callback {
2861 BuildersOrCallback::Builders(graph_builders) => {
2862 graph_builders.yield_from_tick(
2863 inner_ident,
2864 &inner.metadata().location_id,
2865 &inner.metadata().collection_kind,
2866 &yield_ident,
2867 &out_location,
2868 );
2869 }
2870 BuildersOrCallback::Callback(_, node_callback) => {
2871 node_callback(node, next_stmt_id);
2872 }
2873 }
2874
2875 *next_stmt_id += 1;
2876
2877 ident_stack.push(yield_ident);
2878 }
2879
2880 HydroNode::BeginAtomic { inner, metadata } => {
2881 let inner_ident = ident_stack.pop().unwrap();
2882
2883 let begin_ident =
2884 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2885
2886 match builders_or_callback {
2887 BuildersOrCallback::Builders(graph_builders) => {
2888 graph_builders.begin_atomic(
2889 inner_ident,
2890 &inner.metadata().location_id,
2891 &inner.metadata().collection_kind,
2892 &begin_ident,
2893 &out_location,
2894 &metadata.op,
2895 );
2896 }
2897 BuildersOrCallback::Callback(_, node_callback) => {
2898 node_callback(node, next_stmt_id);
2899 }
2900 }
2901
2902 *next_stmt_id += 1;
2903
2904 ident_stack.push(begin_ident);
2905 }
2906
2907 HydroNode::EndAtomic { inner, .. } => {
2908 let inner_ident = ident_stack.pop().unwrap();
2909
2910 let end_ident =
2911 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2912
2913 match builders_or_callback {
2914 BuildersOrCallback::Builders(graph_builders) => {
2915 graph_builders.end_atomic(
2916 inner_ident,
2917 &inner.metadata().location_id,
2918 &inner.metadata().collection_kind,
2919 &end_ident,
2920 );
2921 }
2922 BuildersOrCallback::Callback(_, node_callback) => {
2923 node_callback(node, next_stmt_id);
2924 }
2925 }
2926
2927 *next_stmt_id += 1;
2928
2929 ident_stack.push(end_ident);
2930 }
2931
2932 HydroNode::Source {
2933 source, metadata, ..
2934 } => {
2935 if let HydroSource::ExternalNetwork() = source {
2936 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2937 } else {
2938 let source_ident =
2939 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2940
2941 let source_stmt = match source {
2942 HydroSource::Stream(expr) => {
2943 debug_assert!(metadata.location_id.is_top_level());
2944 parse_quote! {
2945 #source_ident = source_stream(#expr);
2946 }
2947 }
2948
2949 HydroSource::ExternalNetwork() => {
2950 unreachable!()
2951 }
2952
2953 HydroSource::Iter(expr) => {
2954 if metadata.location_id.is_top_level() {
2955 parse_quote! {
2956 #source_ident = source_iter(#expr);
2957 }
2958 } else {
2959 parse_quote! {
2961 #source_ident = source_iter(#expr) -> persist::<'static>();
2962 }
2963 }
2964 }
2965
2966 HydroSource::Spin() => {
2967 debug_assert!(metadata.location_id.is_top_level());
2968 parse_quote! {
2969 #source_ident = spin();
2970 }
2971 }
2972
2973 HydroSource::ClusterMembers(target_loc, state) => {
2974 debug_assert!(metadata.location_id.is_top_level());
2975
2976 let members_tee_ident = syn::Ident::new(
2977 &format!(
2978 "__cluster_members_tee_{}_{}",
2979 metadata.location_id.root().key(),
2980 target_loc.key(),
2981 ),
2982 Span::call_site(),
2983 );
2984
2985 match state {
2986 ClusterMembersState::Stream(d) => {
2987 parse_quote! {
2988 #members_tee_ident = source_stream(#d) -> tee();
2989 #source_ident = #members_tee_ident;
2990 }
2991 },
2992 ClusterMembersState::Uninit => syn::parse_quote! {
2993 #source_ident = source_stream(DUMMY);
2994 },
2995 ClusterMembersState::Tee(..) => parse_quote! {
2996 #source_ident = #members_tee_ident;
2997 },
2998 }
2999 }
3000
3001 HydroSource::Embedded(ident) => {
3002 parse_quote! {
3003 #source_ident = source_stream(#ident);
3004 }
3005 }
3006
3007 HydroSource::EmbeddedSingleton(ident) => {
3008 parse_quote! {
3009 #source_ident = source_iter([#ident]);
3010 }
3011 }
3012 };
3013
3014 match builders_or_callback {
3015 BuildersOrCallback::Builders(graph_builders) => {
3016 let builder = graph_builders.get_dfir_mut(&out_location);
3017 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3018 }
3019 BuildersOrCallback::Callback(_, node_callback) => {
3020 node_callback(node, next_stmt_id);
3021 }
3022 }
3023
3024 *next_stmt_id += 1;
3025
3026 ident_stack.push(source_ident);
3027 }
3028 }
3029
3030 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3031 let source_ident =
3032 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3033
3034 match builders_or_callback {
3035 BuildersOrCallback::Builders(graph_builders) => {
3036 let builder = graph_builders.get_dfir_mut(&out_location);
3037
3038 if *first_tick_only {
3039 assert!(
3040 !metadata.location_id.is_top_level(),
3041 "first_tick_only SingletonSource must be inside a tick"
3042 );
3043 }
3044
3045 if *first_tick_only
3046 || (metadata.location_id.is_top_level()
3047 && metadata.collection_kind.is_bounded())
3048 {
3049 builder.add_dfir(
3050 parse_quote! {
3051 #source_ident = source_iter([#value]);
3052 },
3053 None,
3054 Some(&next_stmt_id.to_string()),
3055 );
3056 } else {
3057 builder.add_dfir(
3058 parse_quote! {
3059 #source_ident = source_iter([#value]) -> persist::<'static>();
3060 },
3061 None,
3062 Some(&next_stmt_id.to_string()),
3063 );
3064 }
3065 }
3066 BuildersOrCallback::Callback(_, node_callback) => {
3067 node_callback(node, next_stmt_id);
3068 }
3069 }
3070
3071 *next_stmt_id += 1;
3072
3073 ident_stack.push(source_ident);
3074 }
3075
3076 HydroNode::CycleSource { cycle_id, .. } => {
3077 let ident = cycle_id.as_ident();
3078
3079 match builders_or_callback {
3080 BuildersOrCallback::Builders(_) => {}
3081 BuildersOrCallback::Callback(_, node_callback) => {
3082 node_callback(node, next_stmt_id);
3083 }
3084 }
3085
3086 *next_stmt_id += 1;
3088
3089 ident_stack.push(ident);
3090 }
3091
3092 HydroNode::Tee { inner, .. } => {
3093 let ret_ident = if let Some(built_idents) =
3094 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3095 {
3096 match builders_or_callback {
3097 BuildersOrCallback::Builders(_) => {}
3098 BuildersOrCallback::Callback(_, node_callback) => {
3099 node_callback(node, next_stmt_id);
3100 }
3101 }
3102
3103 built_idents[0].clone()
3104 } else {
3105 let inner_ident = ident_stack.pop().unwrap();
3108
3109 let tee_ident =
3110 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3111
3112 built_tees.insert(
3113 inner.0.as_ref() as *const RefCell<HydroNode>,
3114 vec![tee_ident.clone()],
3115 );
3116
3117 match builders_or_callback {
3118 BuildersOrCallback::Builders(graph_builders) => {
3119 let builder = graph_builders.get_dfir_mut(&out_location);
3120 builder.add_dfir(
3121 parse_quote! {
3122 #tee_ident = #inner_ident -> tee();
3123 },
3124 None,
3125 Some(&next_stmt_id.to_string()),
3126 );
3127 }
3128 BuildersOrCallback::Callback(_, node_callback) => {
3129 node_callback(node, next_stmt_id);
3130 }
3131 }
3132
3133 tee_ident
3134 };
3135
3136 *next_stmt_id += 1;
3140 ident_stack.push(ret_ident);
3141 }
3142
3143 HydroNode::Partition {
3144 inner, f, is_true, ..
3145 } => {
3146 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3148 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3149 match builders_or_callback {
3150 BuildersOrCallback::Builders(_) => {}
3151 BuildersOrCallback::Callback(_, node_callback) => {
3152 node_callback(node, next_stmt_id);
3153 }
3154 }
3155
3156 let idx = if is_true { 0 } else { 1 };
3157 built_idents[idx].clone()
3158 } else {
3159 let inner_ident = ident_stack.pop().unwrap();
3162
3163 let partition_ident = syn::Ident::new(
3164 &format!("stream_{}_partition", *next_stmt_id),
3165 Span::call_site(),
3166 );
3167 let true_ident = syn::Ident::new(
3168 &format!("stream_{}_true", *next_stmt_id),
3169 Span::call_site(),
3170 );
3171 let false_ident = syn::Ident::new(
3172 &format!("stream_{}_false", *next_stmt_id),
3173 Span::call_site(),
3174 );
3175
3176 built_tees.insert(
3177 ptr,
3178 vec![true_ident.clone(), false_ident.clone()],
3179 );
3180
3181 match builders_or_callback {
3182 BuildersOrCallback::Builders(graph_builders) => {
3183 let builder = graph_builders.get_dfir_mut(&out_location);
3184 builder.add_dfir(
3185 parse_quote! {
3186 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
3187 #true_ident = #partition_ident[0];
3188 #false_ident = #partition_ident[1];
3189 },
3190 None,
3191 Some(&next_stmt_id.to_string()),
3192 );
3193 }
3194 BuildersOrCallback::Callback(_, node_callback) => {
3195 node_callback(node, next_stmt_id);
3196 }
3197 }
3198
3199 if is_true { true_ident } else { false_ident }
3200 };
3201
3202 *next_stmt_id += 1;
3203 ident_stack.push(ret_ident);
3204 }
3205
3206 HydroNode::Chain { .. } => {
3207 let second_ident = ident_stack.pop().unwrap();
3209 let first_ident = ident_stack.pop().unwrap();
3210
3211 let chain_ident =
3212 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3213
3214 match builders_or_callback {
3215 BuildersOrCallback::Builders(graph_builders) => {
3216 let builder = graph_builders.get_dfir_mut(&out_location);
3217 builder.add_dfir(
3218 parse_quote! {
3219 #chain_ident = chain();
3220 #first_ident -> [0]#chain_ident;
3221 #second_ident -> [1]#chain_ident;
3222 },
3223 None,
3224 Some(&next_stmt_id.to_string()),
3225 );
3226 }
3227 BuildersOrCallback::Callback(_, node_callback) => {
3228 node_callback(node, next_stmt_id);
3229 }
3230 }
3231
3232 *next_stmt_id += 1;
3233
3234 ident_stack.push(chain_ident);
3235 }
3236
3237 HydroNode::MergeOrdered { first, metadata, .. } => {
3238 let second_ident = ident_stack.pop().unwrap();
3239 let first_ident = ident_stack.pop().unwrap();
3240
3241 let merge_ident =
3242 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3243
3244 match builders_or_callback {
3245 BuildersOrCallback::Builders(graph_builders) => {
3246 graph_builders.merge_ordered(
3247 &first.metadata().location_id,
3248 first_ident,
3249 second_ident,
3250 &merge_ident,
3251 &first.metadata().collection_kind,
3252 &metadata.op,
3253 Some(&next_stmt_id.to_string()),
3254 );
3255 }
3256 BuildersOrCallback::Callback(_, node_callback) => {
3257 node_callback(node, next_stmt_id);
3258 }
3259 }
3260
3261 *next_stmt_id += 1;
3262
3263 ident_stack.push(merge_ident);
3264 }
3265
3266 HydroNode::ChainFirst { .. } => {
3267 let second_ident = ident_stack.pop().unwrap();
3268 let first_ident = ident_stack.pop().unwrap();
3269
3270 let chain_ident =
3271 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3272
3273 match builders_or_callback {
3274 BuildersOrCallback::Builders(graph_builders) => {
3275 let builder = graph_builders.get_dfir_mut(&out_location);
3276 builder.add_dfir(
3277 parse_quote! {
3278 #chain_ident = chain_first_n(1);
3279 #first_ident -> [0]#chain_ident;
3280 #second_ident -> [1]#chain_ident;
3281 },
3282 None,
3283 Some(&next_stmt_id.to_string()),
3284 );
3285 }
3286 BuildersOrCallback::Callback(_, node_callback) => {
3287 node_callback(node, next_stmt_id);
3288 }
3289 }
3290
3291 *next_stmt_id += 1;
3292
3293 ident_stack.push(chain_ident);
3294 }
3295
3296 HydroNode::CrossSingleton { right, .. } => {
3297 let right_ident = ident_stack.pop().unwrap();
3298 let left_ident = ident_stack.pop().unwrap();
3299
3300 let cross_ident =
3301 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3302
3303 match builders_or_callback {
3304 BuildersOrCallback::Builders(graph_builders) => {
3305 let builder = graph_builders.get_dfir_mut(&out_location);
3306
3307 if right.metadata().location_id.is_top_level()
3308 && right.metadata().collection_kind.is_bounded()
3309 {
3310 builder.add_dfir(
3311 parse_quote! {
3312 #cross_ident = cross_singleton();
3313 #left_ident -> [input]#cross_ident;
3314 #right_ident -> persist::<'static>() -> [single]#cross_ident;
3315 },
3316 None,
3317 Some(&next_stmt_id.to_string()),
3318 );
3319 } else {
3320 builder.add_dfir(
3321 parse_quote! {
3322 #cross_ident = cross_singleton();
3323 #left_ident -> [input]#cross_ident;
3324 #right_ident -> [single]#cross_ident;
3325 },
3326 None,
3327 Some(&next_stmt_id.to_string()),
3328 );
3329 }
3330 }
3331 BuildersOrCallback::Callback(_, node_callback) => {
3332 node_callback(node, next_stmt_id);
3333 }
3334 }
3335
3336 *next_stmt_id += 1;
3337
3338 ident_stack.push(cross_ident);
3339 }
3340
3341 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3342 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3343 parse_quote!(cross_join_multiset)
3344 } else {
3345 parse_quote!(join_multiset)
3346 };
3347
3348 let (HydroNode::CrossProduct { left, right, .. }
3349 | HydroNode::Join { left, right, .. }) = node
3350 else {
3351 unreachable!()
3352 };
3353
3354 let is_top_level = left.metadata().location_id.is_top_level()
3355 && right.metadata().location_id.is_top_level();
3356 let left_lifetime = if left.metadata().location_id.is_top_level() {
3357 quote!('static)
3358 } else {
3359 quote!('tick)
3360 };
3361
3362 let right_lifetime = if right.metadata().location_id.is_top_level() {
3363 quote!('static)
3364 } else {
3365 quote!('tick)
3366 };
3367
3368 let right_ident = ident_stack.pop().unwrap();
3369 let left_ident = ident_stack.pop().unwrap();
3370
3371 let stream_ident =
3372 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3373
3374 match builders_or_callback {
3375 BuildersOrCallback::Builders(graph_builders) => {
3376 let builder = graph_builders.get_dfir_mut(&out_location);
3377 builder.add_dfir(
3378 if is_top_level {
3379 parse_quote! {
3382 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3383 #left_ident -> [0]#stream_ident;
3384 #right_ident -> [1]#stream_ident;
3385 }
3386 } else {
3387 parse_quote! {
3388 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3389 #left_ident -> [0]#stream_ident;
3390 #right_ident -> [1]#stream_ident;
3391 }
3392 }
3393 ,
3394 None,
3395 Some(&next_stmt_id.to_string()),
3396 );
3397 }
3398 BuildersOrCallback::Callback(_, node_callback) => {
3399 node_callback(node, next_stmt_id);
3400 }
3401 }
3402
3403 *next_stmt_id += 1;
3404
3405 ident_stack.push(stream_ident);
3406 }
3407
3408 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3409 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3410 parse_quote!(difference)
3411 } else {
3412 parse_quote!(anti_join)
3413 };
3414
3415 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3416 node
3417 else {
3418 unreachable!()
3419 };
3420
3421 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3422 quote!('static)
3423 } else {
3424 quote!('tick)
3425 };
3426
3427 let neg_ident = ident_stack.pop().unwrap();
3428 let pos_ident = ident_stack.pop().unwrap();
3429
3430 let stream_ident =
3431 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3432
3433 match builders_or_callback {
3434 BuildersOrCallback::Builders(graph_builders) => {
3435 let builder = graph_builders.get_dfir_mut(&out_location);
3436 builder.add_dfir(
3437 parse_quote! {
3438 #stream_ident = #operator::<'tick, #neg_lifetime>();
3439 #pos_ident -> [pos]#stream_ident;
3440 #neg_ident -> [neg]#stream_ident;
3441 },
3442 None,
3443 Some(&next_stmt_id.to_string()),
3444 );
3445 }
3446 BuildersOrCallback::Callback(_, node_callback) => {
3447 node_callback(node, next_stmt_id);
3448 }
3449 }
3450
3451 *next_stmt_id += 1;
3452
3453 ident_stack.push(stream_ident);
3454 }
3455
3456 HydroNode::JoinHalf { .. } => {
3457 let HydroNode::JoinHalf { right, .. } = node else {
3458 unreachable!()
3459 };
3460
3461 assert!(
3462 right.metadata().collection_kind.is_bounded(),
3463 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3464 right.metadata().collection_kind
3465 );
3466
3467 let build_lifetime = if right.metadata().location_id.is_top_level() {
3468 quote!('static)
3469 } else {
3470 quote!('tick)
3471 };
3472
3473 let build_ident = ident_stack.pop().unwrap();
3474 let probe_ident = ident_stack.pop().unwrap();
3475
3476 let stream_ident =
3477 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3478
3479 match builders_or_callback {
3480 BuildersOrCallback::Builders(graph_builders) => {
3481 let builder = graph_builders.get_dfir_mut(&out_location);
3482 builder.add_dfir(
3483 parse_quote! {
3484 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3485 #probe_ident -> [probe]#stream_ident;
3486 #build_ident -> [build]#stream_ident;
3487 },
3488 None,
3489 Some(&next_stmt_id.to_string()),
3490 );
3491 }
3492 BuildersOrCallback::Callback(_, node_callback) => {
3493 node_callback(node, next_stmt_id);
3494 }
3495 }
3496
3497 *next_stmt_id += 1;
3498
3499 ident_stack.push(stream_ident);
3500 }
3501
3502 HydroNode::ResolveFutures { .. } => {
3503 let input_ident = ident_stack.pop().unwrap();
3504
3505 let futures_ident =
3506 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3507
3508 match builders_or_callback {
3509 BuildersOrCallback::Builders(graph_builders) => {
3510 let builder = graph_builders.get_dfir_mut(&out_location);
3511 builder.add_dfir(
3512 parse_quote! {
3513 #futures_ident = #input_ident -> resolve_futures();
3514 },
3515 None,
3516 Some(&next_stmt_id.to_string()),
3517 );
3518 }
3519 BuildersOrCallback::Callback(_, node_callback) => {
3520 node_callback(node, next_stmt_id);
3521 }
3522 }
3523
3524 *next_stmt_id += 1;
3525
3526 ident_stack.push(futures_ident);
3527 }
3528
3529 HydroNode::ResolveFuturesBlocking { .. } => {
3530 let input_ident = ident_stack.pop().unwrap();
3531
3532 let futures_ident =
3533 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3534
3535 match builders_or_callback {
3536 BuildersOrCallback::Builders(graph_builders) => {
3537 let builder = graph_builders.get_dfir_mut(&out_location);
3538 builder.add_dfir(
3539 parse_quote! {
3540 #futures_ident = #input_ident -> resolve_futures_blocking();
3541 },
3542 None,
3543 Some(&next_stmt_id.to_string()),
3544 );
3545 }
3546 BuildersOrCallback::Callback(_, node_callback) => {
3547 node_callback(node, next_stmt_id);
3548 }
3549 }
3550
3551 *next_stmt_id += 1;
3552
3553 ident_stack.push(futures_ident);
3554 }
3555
3556 HydroNode::ResolveFuturesOrdered { .. } => {
3557 let input_ident = ident_stack.pop().unwrap();
3558
3559 let futures_ident =
3560 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3561
3562 match builders_or_callback {
3563 BuildersOrCallback::Builders(graph_builders) => {
3564 let builder = graph_builders.get_dfir_mut(&out_location);
3565 builder.add_dfir(
3566 parse_quote! {
3567 #futures_ident = #input_ident -> resolve_futures_ordered();
3568 },
3569 None,
3570 Some(&next_stmt_id.to_string()),
3571 );
3572 }
3573 BuildersOrCallback::Callback(_, node_callback) => {
3574 node_callback(node, next_stmt_id);
3575 }
3576 }
3577
3578 *next_stmt_id += 1;
3579
3580 ident_stack.push(futures_ident);
3581 }
3582
3583 HydroNode::Map { f, .. } => {
3584 let input_ident = ident_stack.pop().unwrap();
3585
3586 let map_ident =
3587 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3588
3589 match builders_or_callback {
3590 BuildersOrCallback::Builders(graph_builders) => {
3591 let builder = graph_builders.get_dfir_mut(&out_location);
3592 builder.add_dfir(
3593 parse_quote! {
3594 #map_ident = #input_ident -> map(#f);
3595 },
3596 None,
3597 Some(&next_stmt_id.to_string()),
3598 );
3599 }
3600 BuildersOrCallback::Callback(_, node_callback) => {
3601 node_callback(node, next_stmt_id);
3602 }
3603 }
3604
3605 *next_stmt_id += 1;
3606
3607 ident_stack.push(map_ident);
3608 }
3609
3610 HydroNode::FlatMap { f, .. } => {
3611 let input_ident = ident_stack.pop().unwrap();
3612
3613 let flat_map_ident =
3614 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3615
3616 match builders_or_callback {
3617 BuildersOrCallback::Builders(graph_builders) => {
3618 let builder = graph_builders.get_dfir_mut(&out_location);
3619 builder.add_dfir(
3620 parse_quote! {
3621 #flat_map_ident = #input_ident -> flat_map(#f);
3622 },
3623 None,
3624 Some(&next_stmt_id.to_string()),
3625 );
3626 }
3627 BuildersOrCallback::Callback(_, node_callback) => {
3628 node_callback(node, next_stmt_id);
3629 }
3630 }
3631
3632 *next_stmt_id += 1;
3633
3634 ident_stack.push(flat_map_ident);
3635 }
3636
3637 HydroNode::FlatMapStreamBlocking { f, .. } => {
3638 let input_ident = ident_stack.pop().unwrap();
3639
3640 let flat_map_stream_blocking_ident =
3641 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3642
3643 match builders_or_callback {
3644 BuildersOrCallback::Builders(graph_builders) => {
3645 let builder = graph_builders.get_dfir_mut(&out_location);
3646 builder.add_dfir(
3647 parse_quote! {
3648 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f);
3649 },
3650 None,
3651 Some(&next_stmt_id.to_string()),
3652 );
3653 }
3654 BuildersOrCallback::Callback(_, node_callback) => {
3655 node_callback(node, next_stmt_id);
3656 }
3657 }
3658
3659 *next_stmt_id += 1;
3660
3661 ident_stack.push(flat_map_stream_blocking_ident);
3662 }
3663
3664 HydroNode::Filter { f, .. } => {
3665 let input_ident = ident_stack.pop().unwrap();
3666
3667 let filter_ident =
3668 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3669
3670 match builders_or_callback {
3671 BuildersOrCallback::Builders(graph_builders) => {
3672 let builder = graph_builders.get_dfir_mut(&out_location);
3673 builder.add_dfir(
3674 parse_quote! {
3675 #filter_ident = #input_ident -> filter(#f);
3676 },
3677 None,
3678 Some(&next_stmt_id.to_string()),
3679 );
3680 }
3681 BuildersOrCallback::Callback(_, node_callback) => {
3682 node_callback(node, next_stmt_id);
3683 }
3684 }
3685
3686 *next_stmt_id += 1;
3687
3688 ident_stack.push(filter_ident);
3689 }
3690
3691 HydroNode::FilterMap { f, .. } => {
3692 let input_ident = ident_stack.pop().unwrap();
3693
3694 let filter_map_ident =
3695 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3696
3697 match builders_or_callback {
3698 BuildersOrCallback::Builders(graph_builders) => {
3699 let builder = graph_builders.get_dfir_mut(&out_location);
3700 builder.add_dfir(
3701 parse_quote! {
3702 #filter_map_ident = #input_ident -> filter_map(#f);
3703 },
3704 None,
3705 Some(&next_stmt_id.to_string()),
3706 );
3707 }
3708 BuildersOrCallback::Callback(_, node_callback) => {
3709 node_callback(node, next_stmt_id);
3710 }
3711 }
3712
3713 *next_stmt_id += 1;
3714
3715 ident_stack.push(filter_map_ident);
3716 }
3717
3718 HydroNode::Sort { .. } => {
3719 let input_ident = ident_stack.pop().unwrap();
3720
3721 let sort_ident =
3722 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3723
3724 match builders_or_callback {
3725 BuildersOrCallback::Builders(graph_builders) => {
3726 let builder = graph_builders.get_dfir_mut(&out_location);
3727 builder.add_dfir(
3728 parse_quote! {
3729 #sort_ident = #input_ident -> sort();
3730 },
3731 None,
3732 Some(&next_stmt_id.to_string()),
3733 );
3734 }
3735 BuildersOrCallback::Callback(_, node_callback) => {
3736 node_callback(node, next_stmt_id);
3737 }
3738 }
3739
3740 *next_stmt_id += 1;
3741
3742 ident_stack.push(sort_ident);
3743 }
3744
3745 HydroNode::DeferTick { .. } => {
3746 let input_ident = ident_stack.pop().unwrap();
3747
3748 let defer_tick_ident =
3749 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3750
3751 match builders_or_callback {
3752 BuildersOrCallback::Builders(graph_builders) => {
3753 let builder = graph_builders.get_dfir_mut(&out_location);
3754 builder.add_dfir(
3755 parse_quote! {
3756 #defer_tick_ident = #input_ident -> defer_tick_lazy();
3757 },
3758 None,
3759 Some(&next_stmt_id.to_string()),
3760 );
3761 }
3762 BuildersOrCallback::Callback(_, node_callback) => {
3763 node_callback(node, next_stmt_id);
3764 }
3765 }
3766
3767 *next_stmt_id += 1;
3768
3769 ident_stack.push(defer_tick_ident);
3770 }
3771
3772 HydroNode::Enumerate { input, .. } => {
3773 let input_ident = ident_stack.pop().unwrap();
3774
3775 let enumerate_ident =
3776 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3777
3778 match builders_or_callback {
3779 BuildersOrCallback::Builders(graph_builders) => {
3780 let builder = graph_builders.get_dfir_mut(&out_location);
3781 let lifetime = if input.metadata().location_id.is_top_level() {
3782 quote!('static)
3783 } else {
3784 quote!('tick)
3785 };
3786 builder.add_dfir(
3787 parse_quote! {
3788 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3789 },
3790 None,
3791 Some(&next_stmt_id.to_string()),
3792 );
3793 }
3794 BuildersOrCallback::Callback(_, node_callback) => {
3795 node_callback(node, next_stmt_id);
3796 }
3797 }
3798
3799 *next_stmt_id += 1;
3800
3801 ident_stack.push(enumerate_ident);
3802 }
3803
3804 HydroNode::Inspect { f, .. } => {
3805 let input_ident = ident_stack.pop().unwrap();
3806
3807 let inspect_ident =
3808 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3809
3810 match builders_or_callback {
3811 BuildersOrCallback::Builders(graph_builders) => {
3812 let builder = graph_builders.get_dfir_mut(&out_location);
3813 builder.add_dfir(
3814 parse_quote! {
3815 #inspect_ident = #input_ident -> inspect(#f);
3816 },
3817 None,
3818 Some(&next_stmt_id.to_string()),
3819 );
3820 }
3821 BuildersOrCallback::Callback(_, node_callback) => {
3822 node_callback(node, next_stmt_id);
3823 }
3824 }
3825
3826 *next_stmt_id += 1;
3827
3828 ident_stack.push(inspect_ident);
3829 }
3830
3831 HydroNode::Unique { input, .. } => {
3832 let input_ident = ident_stack.pop().unwrap();
3833
3834 let unique_ident =
3835 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3836
3837 match builders_or_callback {
3838 BuildersOrCallback::Builders(graph_builders) => {
3839 let builder = graph_builders.get_dfir_mut(&out_location);
3840 let lifetime = if input.metadata().location_id.is_top_level() {
3841 quote!('static)
3842 } else {
3843 quote!('tick)
3844 };
3845
3846 builder.add_dfir(
3847 parse_quote! {
3848 #unique_ident = #input_ident -> unique::<#lifetime>();
3849 },
3850 None,
3851 Some(&next_stmt_id.to_string()),
3852 );
3853 }
3854 BuildersOrCallback::Callback(_, node_callback) => {
3855 node_callback(node, next_stmt_id);
3856 }
3857 }
3858
3859 *next_stmt_id += 1;
3860
3861 ident_stack.push(unique_ident);
3862 }
3863
3864 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
3865 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3866 if input.metadata().location_id.is_top_level()
3867 && input.metadata().collection_kind.is_bounded()
3868 {
3869 parse_quote!(fold_no_replay)
3870 } else {
3871 parse_quote!(fold)
3872 }
3873 } else if matches!(node, HydroNode::Scan { .. }) {
3874 parse_quote!(scan)
3875 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
3876 parse_quote!(scan_async_blocking)
3877 } else if let HydroNode::FoldKeyed { input, .. } = node {
3878 if input.metadata().location_id.is_top_level()
3879 && input.metadata().collection_kind.is_bounded()
3880 {
3881 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3882 } else {
3883 parse_quote!(fold_keyed)
3884 }
3885 } else {
3886 unreachable!()
3887 };
3888
3889 let (HydroNode::Fold { input, .. }
3890 | HydroNode::FoldKeyed { input, .. }
3891 | HydroNode::Scan { input, .. }
3892 | HydroNode::ScanAsyncBlocking { input, .. }) = node
3893 else {
3894 unreachable!()
3895 };
3896
3897 let lifetime = if input.metadata().location_id.is_top_level() {
3898 quote!('static)
3899 } else {
3900 quote!('tick)
3901 };
3902
3903 let input_ident = ident_stack.pop().unwrap();
3904
3905 let (HydroNode::Fold { init, acc, .. }
3906 | HydroNode::FoldKeyed { init, acc, .. }
3907 | HydroNode::Scan { init, acc, .. }
3908 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
3909 else {
3910 unreachable!()
3911 };
3912
3913 let fold_ident =
3914 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3915
3916 match builders_or_callback {
3917 BuildersOrCallback::Builders(graph_builders) => {
3918 if matches!(node, HydroNode::Fold { .. })
3919 && node.metadata().location_id.is_top_level()
3920 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3921 && graph_builders.singleton_intermediates()
3922 && !node.metadata().collection_kind.is_bounded()
3923 {
3924 let builder = graph_builders.get_dfir_mut(&out_location);
3925
3926 let acc: syn::Expr = parse_quote!({
3927 let mut __inner = #acc;
3928 move |__state, __value| {
3929 __inner(__state, __value);
3930 Some(__state.clone())
3931 }
3932 });
3933
3934 builder.add_dfir(
3935 parse_quote! {
3936 source_iter([(#init)()]) -> [0]#fold_ident;
3937 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3938 #fold_ident = chain();
3939 },
3940 None,
3941 Some(&next_stmt_id.to_string()),
3942 );
3943 } else if matches!(node, HydroNode::FoldKeyed { .. })
3944 && node.metadata().location_id.is_top_level()
3945 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3946 && graph_builders.singleton_intermediates()
3947 && !node.metadata().collection_kind.is_bounded()
3948 {
3949 let builder = graph_builders.get_dfir_mut(&out_location);
3950
3951 let acc: syn::Expr = parse_quote!({
3952 let mut __init = #init;
3953 let mut __inner = #acc;
3954 move |__state, __kv: (_, _)| {
3955 let __state = __state
3957 .entry(::std::clone::Clone::clone(&__kv.0))
3958 .or_insert_with(|| (__init)());
3959 __inner(__state, __kv.1);
3960 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3961 }
3962 });
3963
3964 builder.add_dfir(
3965 parse_quote! {
3966 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3967 },
3968 None,
3969 Some(&next_stmt_id.to_string()),
3970 );
3971 } else {
3972 let builder = graph_builders.get_dfir_mut(&out_location);
3973 builder.add_dfir(
3974 parse_quote! {
3975 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3976 },
3977 None,
3978 Some(&next_stmt_id.to_string()),
3979 );
3980 }
3981 }
3982 BuildersOrCallback::Callback(_, node_callback) => {
3983 node_callback(node, next_stmt_id);
3984 }
3985 }
3986
3987 *next_stmt_id += 1;
3988
3989 ident_stack.push(fold_ident);
3990 }
3991
3992 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3993 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3994 if input.metadata().location_id.is_top_level()
3995 && input.metadata().collection_kind.is_bounded()
3996 {
3997 parse_quote!(reduce_no_replay)
3998 } else {
3999 parse_quote!(reduce)
4000 }
4001 } else if let HydroNode::ReduceKeyed { input, .. } = node {
4002 if input.metadata().location_id.is_top_level()
4003 && input.metadata().collection_kind.is_bounded()
4004 {
4005 todo!(
4006 "Calling keyed reduce on a top-level bounded collection is not supported"
4007 )
4008 } else {
4009 parse_quote!(reduce_keyed)
4010 }
4011 } else {
4012 unreachable!()
4013 };
4014
4015 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4016 else {
4017 unreachable!()
4018 };
4019
4020 let lifetime = if input.metadata().location_id.is_top_level() {
4021 quote!('static)
4022 } else {
4023 quote!('tick)
4024 };
4025
4026 let input_ident = ident_stack.pop().unwrap();
4027
4028 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4029 else {
4030 unreachable!()
4031 };
4032
4033 let reduce_ident =
4034 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4035
4036 match builders_or_callback {
4037 BuildersOrCallback::Builders(graph_builders) => {
4038 if matches!(node, HydroNode::Reduce { .. })
4039 && node.metadata().location_id.is_top_level()
4040 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4041 && graph_builders.singleton_intermediates()
4042 && !node.metadata().collection_kind.is_bounded()
4043 {
4044 todo!(
4045 "Reduce with optional intermediates is not yet supported in simulator"
4046 );
4047 } else if matches!(node, HydroNode::ReduceKeyed { .. })
4048 && node.metadata().location_id.is_top_level()
4049 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4050 && graph_builders.singleton_intermediates()
4051 && !node.metadata().collection_kind.is_bounded()
4052 {
4053 todo!(
4054 "Reduce keyed with optional intermediates is not yet supported in simulator"
4055 );
4056 } else {
4057 let builder = graph_builders.get_dfir_mut(&out_location);
4058 builder.add_dfir(
4059 parse_quote! {
4060 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
4061 },
4062 None,
4063 Some(&next_stmt_id.to_string()),
4064 );
4065 }
4066 }
4067 BuildersOrCallback::Callback(_, node_callback) => {
4068 node_callback(node, next_stmt_id);
4069 }
4070 }
4071
4072 *next_stmt_id += 1;
4073
4074 ident_stack.push(reduce_ident);
4075 }
4076
4077 HydroNode::ReduceKeyedWatermark {
4078 f,
4079 input,
4080 metadata,
4081 ..
4082 } => {
4083 let lifetime = if input.metadata().location_id.is_top_level() {
4084 quote!('static)
4085 } else {
4086 quote!('tick)
4087 };
4088
4089 let watermark_ident = ident_stack.pop().unwrap();
4091 let input_ident = ident_stack.pop().unwrap();
4092
4093 let chain_ident = syn::Ident::new(
4094 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4095 Span::call_site(),
4096 );
4097
4098 let fold_ident =
4099 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4100
4101 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4102 && input.metadata().collection_kind.is_bounded()
4103 {
4104 parse_quote!(fold_no_replay)
4105 } else {
4106 parse_quote!(fold)
4107 };
4108
4109 match builders_or_callback {
4110 BuildersOrCallback::Builders(graph_builders) => {
4111 if metadata.location_id.is_top_level()
4112 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4113 && graph_builders.singleton_intermediates()
4114 && !metadata.collection_kind.is_bounded()
4115 {
4116 todo!(
4117 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4118 )
4119 } else {
4120 let builder = graph_builders.get_dfir_mut(&out_location);
4121 builder.add_dfir(
4122 parse_quote! {
4123 #chain_ident = chain();
4124 #input_ident
4125 -> map(|x| (Some(x), None))
4126 -> [0]#chain_ident;
4127 #watermark_ident
4128 -> map(|watermark| (None, Some(watermark)))
4129 -> [1]#chain_ident;
4130
4131 #fold_ident = #chain_ident
4132 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4133 let __reduce_keyed_fn = #f;
4134 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4135 if let Some((k, v)) = opt_payload {
4136 if let Some(curr_watermark) = *opt_curr_watermark {
4137 if k < curr_watermark {
4138 return;
4139 }
4140 }
4141 match map.entry(k) {
4142 ::std::collections::hash_map::Entry::Vacant(e) => {
4143 e.insert(v);
4144 }
4145 ::std::collections::hash_map::Entry::Occupied(mut e) => {
4146 __reduce_keyed_fn(e.get_mut(), v);
4147 }
4148 }
4149 } else {
4150 let watermark = opt_watermark.unwrap();
4151 if let Some(curr_watermark) = *opt_curr_watermark {
4152 if watermark <= curr_watermark {
4153 return;
4154 }
4155 }
4156 map.retain(|k, _| *k >= watermark);
4157 *opt_curr_watermark = Some(watermark);
4158 }
4159 }
4160 })
4161 -> flat_map(|(map, _curr_watermark)| map);
4162 },
4163 None,
4164 Some(&next_stmt_id.to_string()),
4165 );
4166 }
4167 }
4168 BuildersOrCallback::Callback(_, node_callback) => {
4169 node_callback(node, next_stmt_id);
4170 }
4171 }
4172
4173 *next_stmt_id += 1;
4174
4175 ident_stack.push(fold_ident);
4176 }
4177
4178 HydroNode::Network {
4179 networking_info,
4180 serialize_fn: serialize_pipeline,
4181 instantiate_fn,
4182 deserialize_fn: deserialize_pipeline,
4183 input,
4184 ..
4185 } => {
4186 let input_ident = ident_stack.pop().unwrap();
4187
4188 let receiver_stream_ident =
4189 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4190
4191 match builders_or_callback {
4192 BuildersOrCallback::Builders(graph_builders) => {
4193 let (sink_expr, source_expr) = match instantiate_fn {
4194 DebugInstantiate::Building => (
4195 syn::parse_quote!(DUMMY_SINK),
4196 syn::parse_quote!(DUMMY_SOURCE),
4197 ),
4198
4199 DebugInstantiate::Finalized(finalized) => {
4200 (finalized.sink.clone(), finalized.source.clone())
4201 }
4202 };
4203
4204 graph_builders.create_network(
4205 &input.metadata().location_id,
4206 &out_location,
4207 input_ident,
4208 &receiver_stream_ident,
4209 serialize_pipeline.as_ref(),
4210 sink_expr,
4211 source_expr,
4212 deserialize_pipeline.as_ref(),
4213 *next_stmt_id,
4214 networking_info,
4215 );
4216 }
4217 BuildersOrCallback::Callback(_, node_callback) => {
4218 node_callback(node, next_stmt_id);
4219 }
4220 }
4221
4222 *next_stmt_id += 1;
4223
4224 ident_stack.push(receiver_stream_ident);
4225 }
4226
4227 HydroNode::ExternalInput {
4228 instantiate_fn,
4229 deserialize_fn: deserialize_pipeline,
4230 ..
4231 } => {
4232 let receiver_stream_ident =
4233 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4234
4235 match builders_or_callback {
4236 BuildersOrCallback::Builders(graph_builders) => {
4237 let (_, source_expr) = match instantiate_fn {
4238 DebugInstantiate::Building => (
4239 syn::parse_quote!(DUMMY_SINK),
4240 syn::parse_quote!(DUMMY_SOURCE),
4241 ),
4242
4243 DebugInstantiate::Finalized(finalized) => {
4244 (finalized.sink.clone(), finalized.source.clone())
4245 }
4246 };
4247
4248 graph_builders.create_external_source(
4249 &out_location,
4250 source_expr,
4251 &receiver_stream_ident,
4252 deserialize_pipeline.as_ref(),
4253 *next_stmt_id,
4254 );
4255 }
4256 BuildersOrCallback::Callback(_, node_callback) => {
4257 node_callback(node, next_stmt_id);
4258 }
4259 }
4260
4261 *next_stmt_id += 1;
4262
4263 ident_stack.push(receiver_stream_ident);
4264 }
4265
4266 HydroNode::Counter {
4267 tag,
4268 duration,
4269 prefix,
4270 ..
4271 } => {
4272 let input_ident = ident_stack.pop().unwrap();
4273
4274 let counter_ident =
4275 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4276
4277 match builders_or_callback {
4278 BuildersOrCallback::Builders(graph_builders) => {
4279 let arg = format!("{}({})", prefix, tag);
4280 let builder = graph_builders.get_dfir_mut(&out_location);
4281 builder.add_dfir(
4282 parse_quote! {
4283 #counter_ident = #input_ident -> _counter(#arg, #duration);
4284 },
4285 None,
4286 Some(&next_stmt_id.to_string()),
4287 );
4288 }
4289 BuildersOrCallback::Callback(_, node_callback) => {
4290 node_callback(node, next_stmt_id);
4291 }
4292 }
4293
4294 *next_stmt_id += 1;
4295
4296 ident_stack.push(counter_ident);
4297 }
4298 }
4299 },
4300 seen_tees,
4301 false,
4302 );
4303
4304 ident_stack
4305 .pop()
4306 .expect("ident_stack should have exactly one element after traversal")
4307 }
4308
4309 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4310 match self {
4311 HydroNode::Placeholder => {
4312 panic!()
4313 }
4314 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
4315 HydroNode::Source { source, .. } => match source {
4316 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4317 HydroSource::ExternalNetwork()
4318 | HydroSource::Spin()
4319 | HydroSource::ClusterMembers(_, _)
4320 | HydroSource::Embedded(_)
4321 | HydroSource::EmbeddedSingleton(_) => {} },
4323 HydroNode::SingletonSource { value, .. } => {
4324 transform(value);
4325 }
4326 HydroNode::CycleSource { .. }
4327 | HydroNode::Tee { .. }
4328 | HydroNode::YieldConcat { .. }
4329 | HydroNode::BeginAtomic { .. }
4330 | HydroNode::EndAtomic { .. }
4331 | HydroNode::Batch { .. }
4332 | HydroNode::Chain { .. }
4333 | HydroNode::MergeOrdered { .. }
4334 | HydroNode::ChainFirst { .. }
4335 | HydroNode::CrossProduct { .. }
4336 | HydroNode::CrossSingleton { .. }
4337 | HydroNode::ResolveFutures { .. }
4338 | HydroNode::ResolveFuturesBlocking { .. }
4339 | HydroNode::ResolveFuturesOrdered { .. }
4340 | HydroNode::Join { .. }
4341 | HydroNode::JoinHalf { .. }
4342 | HydroNode::Difference { .. }
4343 | HydroNode::AntiJoin { .. }
4344 | HydroNode::DeferTick { .. }
4345 | HydroNode::Enumerate { .. }
4346 | HydroNode::Unique { .. }
4347 | HydroNode::Sort { .. } => {}
4348 HydroNode::Map { f, .. }
4349 | HydroNode::FlatMap { f, .. }
4350 | HydroNode::FlatMapStreamBlocking { f, .. }
4351 | HydroNode::Filter { f, .. }
4352 | HydroNode::FilterMap { f, .. }
4353 | HydroNode::Inspect { f, .. }
4354 | HydroNode::Partition { f, .. }
4355 | HydroNode::Reduce { f, .. }
4356 | HydroNode::ReduceKeyed { f, .. }
4357 | HydroNode::ReduceKeyedWatermark { f, .. } => {
4358 transform(f);
4359 }
4360 HydroNode::Fold { init, acc, .. }
4361 | HydroNode::Scan { init, acc, .. }
4362 | HydroNode::ScanAsyncBlocking { init, acc, .. }
4363 | HydroNode::FoldKeyed { init, acc, .. } => {
4364 transform(init);
4365 transform(acc);
4366 }
4367 HydroNode::Network {
4368 serialize_fn,
4369 deserialize_fn,
4370 ..
4371 } => {
4372 if let Some(serialize_fn) = serialize_fn {
4373 transform(serialize_fn);
4374 }
4375 if let Some(deserialize_fn) = deserialize_fn {
4376 transform(deserialize_fn);
4377 }
4378 }
4379 HydroNode::ExternalInput { deserialize_fn, .. } => {
4380 if let Some(deserialize_fn) = deserialize_fn {
4381 transform(deserialize_fn);
4382 }
4383 }
4384 HydroNode::Counter { duration, .. } => {
4385 transform(duration);
4386 }
4387 }
4388 }
4389
4390 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4391 &self.metadata().op
4392 }
4393
4394 pub fn metadata(&self) -> &HydroIrMetadata {
4395 match self {
4396 HydroNode::Placeholder => {
4397 panic!()
4398 }
4399 HydroNode::Cast { metadata, .. } => metadata,
4400 HydroNode::ObserveNonDet { metadata, .. } => metadata,
4401 HydroNode::Source { metadata, .. } => metadata,
4402 HydroNode::SingletonSource { metadata, .. } => metadata,
4403 HydroNode::CycleSource { metadata, .. } => metadata,
4404 HydroNode::Tee { metadata, .. } => metadata,
4405 HydroNode::Partition { metadata, .. } => metadata,
4406 HydroNode::YieldConcat { metadata, .. } => metadata,
4407 HydroNode::BeginAtomic { metadata, .. } => metadata,
4408 HydroNode::EndAtomic { metadata, .. } => metadata,
4409 HydroNode::Batch { metadata, .. } => metadata,
4410 HydroNode::Chain { metadata, .. } => metadata,
4411 HydroNode::MergeOrdered { metadata, .. } => metadata,
4412 HydroNode::ChainFirst { metadata, .. } => metadata,
4413 HydroNode::CrossProduct { metadata, .. } => metadata,
4414 HydroNode::CrossSingleton { metadata, .. } => metadata,
4415 HydroNode::Join { metadata, .. } => metadata,
4416 HydroNode::JoinHalf { metadata, .. } => metadata,
4417 HydroNode::Difference { metadata, .. } => metadata,
4418 HydroNode::AntiJoin { metadata, .. } => metadata,
4419 HydroNode::ResolveFutures { metadata, .. } => metadata,
4420 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4421 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4422 HydroNode::Map { metadata, .. } => metadata,
4423 HydroNode::FlatMap { metadata, .. } => metadata,
4424 HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4425 HydroNode::Filter { metadata, .. } => metadata,
4426 HydroNode::FilterMap { metadata, .. } => metadata,
4427 HydroNode::DeferTick { metadata, .. } => metadata,
4428 HydroNode::Enumerate { metadata, .. } => metadata,
4429 HydroNode::Inspect { metadata, .. } => metadata,
4430 HydroNode::Unique { metadata, .. } => metadata,
4431 HydroNode::Sort { metadata, .. } => metadata,
4432 HydroNode::Scan { metadata, .. } => metadata,
4433 HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4434 HydroNode::Fold { metadata, .. } => metadata,
4435 HydroNode::FoldKeyed { metadata, .. } => metadata,
4436 HydroNode::Reduce { metadata, .. } => metadata,
4437 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4438 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4439 HydroNode::ExternalInput { metadata, .. } => metadata,
4440 HydroNode::Network { metadata, .. } => metadata,
4441 HydroNode::Counter { metadata, .. } => metadata,
4442 }
4443 }
4444
4445 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4446 &mut self.metadata_mut().op
4447 }
4448
4449 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4450 match self {
4451 HydroNode::Placeholder => {
4452 panic!()
4453 }
4454 HydroNode::Cast { metadata, .. } => metadata,
4455 HydroNode::ObserveNonDet { metadata, .. } => metadata,
4456 HydroNode::Source { metadata, .. } => metadata,
4457 HydroNode::SingletonSource { metadata, .. } => metadata,
4458 HydroNode::CycleSource { metadata, .. } => metadata,
4459 HydroNode::Tee { metadata, .. } => metadata,
4460 HydroNode::Partition { metadata, .. } => metadata,
4461 HydroNode::YieldConcat { metadata, .. } => metadata,
4462 HydroNode::BeginAtomic { metadata, .. } => metadata,
4463 HydroNode::EndAtomic { metadata, .. } => metadata,
4464 HydroNode::Batch { metadata, .. } => metadata,
4465 HydroNode::Chain { metadata, .. } => metadata,
4466 HydroNode::MergeOrdered { metadata, .. } => metadata,
4467 HydroNode::ChainFirst { metadata, .. } => metadata,
4468 HydroNode::CrossProduct { metadata, .. } => metadata,
4469 HydroNode::CrossSingleton { metadata, .. } => metadata,
4470 HydroNode::Join { metadata, .. } => metadata,
4471 HydroNode::JoinHalf { metadata, .. } => metadata,
4472 HydroNode::Difference { metadata, .. } => metadata,
4473 HydroNode::AntiJoin { metadata, .. } => metadata,
4474 HydroNode::ResolveFutures { metadata, .. } => metadata,
4475 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4476 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4477 HydroNode::Map { metadata, .. } => metadata,
4478 HydroNode::FlatMap { metadata, .. } => metadata,
4479 HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4480 HydroNode::Filter { metadata, .. } => metadata,
4481 HydroNode::FilterMap { metadata, .. } => metadata,
4482 HydroNode::DeferTick { metadata, .. } => metadata,
4483 HydroNode::Enumerate { metadata, .. } => metadata,
4484 HydroNode::Inspect { metadata, .. } => metadata,
4485 HydroNode::Unique { metadata, .. } => metadata,
4486 HydroNode::Sort { metadata, .. } => metadata,
4487 HydroNode::Scan { metadata, .. } => metadata,
4488 HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4489 HydroNode::Fold { metadata, .. } => metadata,
4490 HydroNode::FoldKeyed { metadata, .. } => metadata,
4491 HydroNode::Reduce { metadata, .. } => metadata,
4492 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4493 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4494 HydroNode::ExternalInput { metadata, .. } => metadata,
4495 HydroNode::Network { metadata, .. } => metadata,
4496 HydroNode::Counter { metadata, .. } => metadata,
4497 }
4498 }
4499
4500 pub fn input(&self) -> Vec<&HydroNode> {
4501 match self {
4502 HydroNode::Placeholder => {
4503 panic!()
4504 }
4505 HydroNode::Source { .. }
4506 | HydroNode::SingletonSource { .. }
4507 | HydroNode::ExternalInput { .. }
4508 | HydroNode::CycleSource { .. }
4509 | HydroNode::Tee { .. }
4510 | HydroNode::Partition { .. } => {
4511 vec![]
4513 }
4514 HydroNode::Cast { inner, .. }
4515 | HydroNode::ObserveNonDet { inner, .. }
4516 | HydroNode::YieldConcat { inner, .. }
4517 | HydroNode::BeginAtomic { inner, .. }
4518 | HydroNode::EndAtomic { inner, .. }
4519 | HydroNode::Batch { inner, .. } => {
4520 vec![inner]
4521 }
4522 HydroNode::Chain { first, second, .. } => {
4523 vec![first, second]
4524 }
4525 HydroNode::MergeOrdered { first, second, .. } => {
4526 vec![first, second]
4527 }
4528 HydroNode::ChainFirst { first, second, .. } => {
4529 vec![first, second]
4530 }
4531 HydroNode::CrossProduct { left, right, .. }
4532 | HydroNode::CrossSingleton { left, right, .. }
4533 | HydroNode::Join { left, right, .. }
4534 | HydroNode::JoinHalf { left, right, .. } => {
4535 vec![left, right]
4536 }
4537 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4538 vec![pos, neg]
4539 }
4540 HydroNode::Map { input, .. }
4541 | HydroNode::FlatMap { input, .. }
4542 | HydroNode::FlatMapStreamBlocking { input, .. }
4543 | HydroNode::Filter { input, .. }
4544 | HydroNode::FilterMap { input, .. }
4545 | HydroNode::Sort { input, .. }
4546 | HydroNode::DeferTick { input, .. }
4547 | HydroNode::Enumerate { input, .. }
4548 | HydroNode::Inspect { input, .. }
4549 | HydroNode::Unique { input, .. }
4550 | HydroNode::Network { input, .. }
4551 | HydroNode::Counter { input, .. }
4552 | HydroNode::ResolveFutures { input, .. }
4553 | HydroNode::ResolveFuturesBlocking { input, .. }
4554 | HydroNode::ResolveFuturesOrdered { input, .. }
4555 | HydroNode::Fold { input, .. }
4556 | HydroNode::FoldKeyed { input, .. }
4557 | HydroNode::Reduce { input, .. }
4558 | HydroNode::ReduceKeyed { input, .. }
4559 | HydroNode::Scan { input, .. }
4560 | HydroNode::ScanAsyncBlocking { input, .. } => {
4561 vec![input]
4562 }
4563 HydroNode::ReduceKeyedWatermark {
4564 input, watermark, ..
4565 } => {
4566 vec![input, watermark]
4567 }
4568 }
4569 }
4570
4571 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4572 self.input()
4573 .iter()
4574 .map(|input_node| input_node.metadata())
4575 .collect()
4576 }
4577
4578 pub fn is_shared_with_others(&self) -> bool {
4582 match self {
4583 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4584 Rc::strong_count(&inner.0) > 1
4585 }
4586 _ => false,
4587 }
4588 }
4589
4590 pub fn print_root(&self) -> String {
4591 match self {
4592 HydroNode::Placeholder => {
4593 panic!()
4594 }
4595 HydroNode::Cast { .. } => "Cast()".to_owned(),
4596 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4597 HydroNode::Source { source, .. } => format!("Source({:?})", source),
4598 HydroNode::SingletonSource {
4599 value,
4600 first_tick_only,
4601 ..
4602 } => format!(
4603 "SingletonSource({:?}, first_tick_only={})",
4604 value, first_tick_only
4605 ),
4606 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4607 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
4608 HydroNode::Partition { f, is_true, .. } => {
4609 format!("Partition({:?}, is_true={})", f, is_true)
4610 }
4611 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4612 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4613 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4614 HydroNode::Batch { .. } => "Batch()".to_owned(),
4615 HydroNode::Chain { first, second, .. } => {
4616 format!("Chain({}, {})", first.print_root(), second.print_root())
4617 }
4618 HydroNode::MergeOrdered { first, second, .. } => {
4619 format!(
4620 "MergeOrdered({}, {})",
4621 first.print_root(),
4622 second.print_root()
4623 )
4624 }
4625 HydroNode::ChainFirst { first, second, .. } => {
4626 format!(
4627 "ChainFirst({}, {})",
4628 first.print_root(),
4629 second.print_root()
4630 )
4631 }
4632 HydroNode::CrossProduct { left, right, .. } => {
4633 format!(
4634 "CrossProduct({}, {})",
4635 left.print_root(),
4636 right.print_root()
4637 )
4638 }
4639 HydroNode::CrossSingleton { left, right, .. } => {
4640 format!(
4641 "CrossSingleton({}, {})",
4642 left.print_root(),
4643 right.print_root()
4644 )
4645 }
4646 HydroNode::Join { left, right, .. } => {
4647 format!("Join({}, {})", left.print_root(), right.print_root())
4648 }
4649 HydroNode::JoinHalf { left, right, .. } => {
4650 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
4651 }
4652 HydroNode::Difference { pos, neg, .. } => {
4653 format!("Difference({}, {})", pos.print_root(), neg.print_root())
4654 }
4655 HydroNode::AntiJoin { pos, neg, .. } => {
4656 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4657 }
4658 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4659 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
4660 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4661 HydroNode::Map { f, .. } => format!("Map({:?})", f),
4662 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4663 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
4664 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4665 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4666 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4667 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4668 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4669 HydroNode::Unique { .. } => "Unique()".to_owned(),
4670 HydroNode::Sort { .. } => "Sort()".to_owned(),
4671 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4672 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4673 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
4674 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
4675 }
4676 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4677 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4678 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4679 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4680 HydroNode::Network { .. } => "Network()".to_owned(),
4681 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4682 HydroNode::Counter { tag, duration, .. } => {
4683 format!("Counter({:?}, {:?})", tag, duration)
4684 }
4685 }
4686 }
4687}
4688
4689#[cfg(feature = "build")]
4690fn instantiate_network<'a, D>(
4691 env: &mut D::InstantiateEnv,
4692 from_location: &LocationId,
4693 to_location: &LocationId,
4694 processes: &SparseSecondaryMap<LocationKey, D::Process>,
4695 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4696 name: Option<&str>,
4697 networking_info: &crate::networking::NetworkingInfo,
4698) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4699where
4700 D: Deploy<'a>,
4701{
4702 let ((sink, source), connect_fn) = match (from_location, to_location) {
4703 (&LocationId::Process(from), &LocationId::Process(to)) => {
4704 let from_node = processes
4705 .get(from)
4706 .unwrap_or_else(|| {
4707 panic!("A process used in the graph was not instantiated: {}", from)
4708 })
4709 .clone();
4710 let to_node = processes
4711 .get(to)
4712 .unwrap_or_else(|| {
4713 panic!("A process used in the graph was not instantiated: {}", to)
4714 })
4715 .clone();
4716
4717 let sink_port = from_node.next_port();
4718 let source_port = to_node.next_port();
4719
4720 (
4721 D::o2o_sink_source(
4722 env,
4723 &from_node,
4724 &sink_port,
4725 &to_node,
4726 &source_port,
4727 name,
4728 networking_info,
4729 ),
4730 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4731 )
4732 }
4733 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
4734 let from_node = processes
4735 .get(from)
4736 .unwrap_or_else(|| {
4737 panic!("A process used in the graph was not instantiated: {}", from)
4738 })
4739 .clone();
4740 let to_node = clusters
4741 .get(to)
4742 .unwrap_or_else(|| {
4743 panic!("A cluster used in the graph was not instantiated: {}", to)
4744 })
4745 .clone();
4746
4747 let sink_port = from_node.next_port();
4748 let source_port = to_node.next_port();
4749
4750 (
4751 D::o2m_sink_source(
4752 env,
4753 &from_node,
4754 &sink_port,
4755 &to_node,
4756 &source_port,
4757 name,
4758 networking_info,
4759 ),
4760 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4761 )
4762 }
4763 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4764 let from_node = clusters
4765 .get(from)
4766 .unwrap_or_else(|| {
4767 panic!("A cluster used in the graph was not instantiated: {}", from)
4768 })
4769 .clone();
4770 let to_node = processes
4771 .get(to)
4772 .unwrap_or_else(|| {
4773 panic!("A process used in the graph was not instantiated: {}", to)
4774 })
4775 .clone();
4776
4777 let sink_port = from_node.next_port();
4778 let source_port = to_node.next_port();
4779
4780 (
4781 D::m2o_sink_source(
4782 env,
4783 &from_node,
4784 &sink_port,
4785 &to_node,
4786 &source_port,
4787 name,
4788 networking_info,
4789 ),
4790 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4791 )
4792 }
4793 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4794 let from_node = clusters
4795 .get(from)
4796 .unwrap_or_else(|| {
4797 panic!("A cluster used in the graph was not instantiated: {}", from)
4798 })
4799 .clone();
4800 let to_node = clusters
4801 .get(to)
4802 .unwrap_or_else(|| {
4803 panic!("A cluster used in the graph was not instantiated: {}", to)
4804 })
4805 .clone();
4806
4807 let sink_port = from_node.next_port();
4808 let source_port = to_node.next_port();
4809
4810 (
4811 D::m2m_sink_source(
4812 env,
4813 &from_node,
4814 &sink_port,
4815 &to_node,
4816 &source_port,
4817 name,
4818 networking_info,
4819 ),
4820 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4821 )
4822 }
4823 (LocationId::Tick(_, _), _) => panic!(),
4824 (_, LocationId::Tick(_, _)) => panic!(),
4825 (LocationId::Atomic(_), _) => panic!(),
4826 (_, LocationId::Atomic(_)) => panic!(),
4827 };
4828 (sink, source, connect_fn)
4829}
4830
4831#[cfg(test)]
4832mod serde_test;
4833
4834#[cfg(test)]
4835mod test {
4836 use std::mem::size_of;
4837
4838 use stageleft::{QuotedWithContext, q};
4839
4840 use super::*;
4841
4842 #[test]
4843 #[cfg_attr(
4844 not(feature = "build"),
4845 ignore = "expects inclusion of feature-gated fields"
4846 )]
4847 fn hydro_node_size() {
4848 assert_eq!(size_of::<HydroNode>(), 248);
4849 }
4850
4851 #[test]
4852 #[cfg_attr(
4853 not(feature = "build"),
4854 ignore = "expects inclusion of feature-gated fields"
4855 )]
4856 fn hydro_root_size() {
4857 assert_eq!(size_of::<HydroRoot>(), 136);
4858 }
4859
4860 #[test]
4861 fn test_simplify_q_macro_basic() {
4862 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4864 let result = simplify_q_macro(simple_expr.clone());
4865 assert_eq!(result, simple_expr);
4866 }
4867
4868 #[test]
4869 fn test_simplify_q_macro_actual_stageleft_call() {
4870 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4872 let result = simplify_q_macro(stageleft_call);
4873 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4876 }
4877
4878 #[test]
4879 fn test_closure_no_pipe_at_start() {
4880 let stageleft_call = q!({
4882 let foo = 123;
4883 move |b: usize| b + foo
4884 })
4885 .splice_fn1_ctx(&());
4886 let result = simplify_q_macro(stageleft_call);
4887 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4888 }
4889}