Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37/// Wrapper that displays only the tokens of a parsed expr.
38///
39/// Boxes `syn::Type` which is ~240 bytes.
40#[derive(Clone, Hash)]
41pub struct DebugExpr(pub Box<syn::Expr>);
42
43impl serde::Serialize for DebugExpr {
44    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
45        serializer.serialize_str(&self.to_string())
46    }
47}
48
49impl From<syn::Expr> for DebugExpr {
50    fn from(expr: syn::Expr) -> Self {
51        Self(Box::new(expr))
52    }
53}
54
55impl Deref for DebugExpr {
56    type Target = syn::Expr;
57
58    fn deref(&self) -> &Self::Target {
59        &self.0
60    }
61}
62
63impl ToTokens for DebugExpr {
64    fn to_tokens(&self, tokens: &mut TokenStream) {
65        self.0.to_tokens(tokens);
66    }
67}
68
69impl Debug for DebugExpr {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        write!(f, "{}", self.0.to_token_stream())
72    }
73}
74
75impl Display for DebugExpr {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        let original = self.0.as_ref().clone();
78        let simplified = simplify_q_macro(original);
79
80        // For now, just use quote formatting without trying to parse as a statement
81        // This avoids the syn::parse_quote! issues entirely
82        write!(f, "q!({})", quote::quote!(#simplified))
83    }
84}
85
86/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
87fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
88    // Try to parse the token string as a syn::Expr
89    // Use a visitor to simplify q! macro expansions
90    let mut simplifier = QMacroSimplifier::new();
91    simplifier.visit_expr_mut(&mut expr);
92
93    // If we found and simplified a q! macro, return the simplified version
94    if let Some(simplified) = simplifier.simplified_result {
95        simplified
96    } else {
97        expr
98    }
99}
100
101/// AST visitor that simplifies q! macro expansions
102#[derive(Default)]
103pub struct QMacroSimplifier {
104    pub simplified_result: Option<syn::Expr>,
105}
106
107impl QMacroSimplifier {
108    pub fn new() -> Self {
109        Self::default()
110    }
111}
112
113impl VisitMut for QMacroSimplifier {
114    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
115        // Check if we already found a result to avoid further processing
116        if self.simplified_result.is_some() {
117            return;
118        }
119
120        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
121            // Look for calls to stageleft::runtime_support::fn*
122            && self.is_stageleft_runtime_support_call(&path_expr.path)
123            // Try to extract the closure from the arguments
124            && let Some(closure) = self.extract_closure_from_args(&call.args)
125        {
126            self.simplified_result = Some(closure);
127            return;
128        }
129
130        // Continue visiting child expressions using the default implementation
131        // Use the default visitor to avoid infinite recursion
132        syn::visit_mut::visit_expr_mut(self, expr);
133    }
134}
135
136impl QMacroSimplifier {
137    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
138        // Check if this is a call to stageleft::runtime_support::fn*
139        if let Some(last_segment) = path.segments.last() {
140            let fn_name = last_segment.ident.to_string();
141            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
142            fn_name.contains("_type_hint")
143                && path.segments.len() > 2
144                && path.segments[0].ident == "stageleft"
145                && path.segments[1].ident == "runtime_support"
146        } else {
147            false
148        }
149    }
150
151    fn extract_closure_from_args(
152        &self,
153        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
154    ) -> Option<syn::Expr> {
155        // Look through the arguments for a closure expression
156        for arg in args {
157            if let syn::Expr::Closure(_) = arg {
158                return Some(arg.clone());
159            }
160            // Also check for closures nested in other expressions (like blocks)
161            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
162                return Some(closure_expr);
163            }
164        }
165        None
166    }
167
168    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
169        let mut visitor = ClosureFinder {
170            found_closure: None,
171            prefer_inner_blocks: true,
172        };
173        visitor.visit_expr(expr);
174        visitor.found_closure
175    }
176}
177
178/// Visitor that finds closures in expressions with special block handling
179struct ClosureFinder {
180    found_closure: Option<syn::Expr>,
181    prefer_inner_blocks: bool,
182}
183
184impl<'ast> Visit<'ast> for ClosureFinder {
185    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
186        // If we already found a closure, don't continue searching
187        if self.found_closure.is_some() {
188            return;
189        }
190
191        match expr {
192            syn::Expr::Closure(_) => {
193                self.found_closure = Some(expr.clone());
194            }
195            syn::Expr::Block(block) if self.prefer_inner_blocks => {
196                // Special handling for blocks - look for inner blocks that contain closures
197                for stmt in &block.block.stmts {
198                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
199                        && let syn::Expr::Block(_) = stmt_expr
200                    {
201                        // Check if this nested block contains a closure
202                        let mut inner_visitor = ClosureFinder {
203                            found_closure: None,
204                            prefer_inner_blocks: false, // Avoid infinite recursion
205                        };
206                        inner_visitor.visit_expr(stmt_expr);
207                        if inner_visitor.found_closure.is_some() {
208                            // Found a closure in an inner block, return that block
209                            self.found_closure = Some(stmt_expr.clone());
210                            return;
211                        }
212                    }
213                }
214
215                // If no inner block with closure found, continue with normal visitation
216                visit::visit_expr(self, expr);
217
218                // If we found a closure, just return the closure itself, not the whole block
219                // unless we're in the special case where we want the containing block
220                if self.found_closure.is_some() {
221                    // The closure was found during visitation, no need to wrap in block
222                }
223            }
224            _ => {
225                // Use default visitor behavior for all other expressions
226                visit::visit_expr(self, expr);
227            }
228        }
229    }
230}
231
232/// Debug displays the type's tokens.
233///
234/// Boxes `syn::Type` which is ~320 bytes.
235#[derive(Clone, PartialEq, Eq, Hash)]
236pub struct DebugType(pub Box<syn::Type>);
237
238impl From<syn::Type> for DebugType {
239    fn from(t: syn::Type) -> Self {
240        Self(Box::new(t))
241    }
242}
243
244impl Deref for DebugType {
245    type Target = syn::Type;
246
247    fn deref(&self) -> &Self::Target {
248        &self.0
249    }
250}
251
252impl ToTokens for DebugType {
253    fn to_tokens(&self, tokens: &mut TokenStream) {
254        self.0.to_tokens(tokens);
255    }
256}
257
258impl Debug for DebugType {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        write!(f, "{}", self.0.to_token_stream())
261    }
262}
263
264impl serde::Serialize for DebugType {
265    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
266        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
267    }
268}
269
270fn serialize_backtrace_as_span<S: serde::Serializer>(
271    backtrace: &Backtrace,
272    serializer: S,
273) -> Result<S::Ok, S::Error> {
274    match backtrace.format_span() {
275        Some(span) => serializer.serialize_some(&span),
276        None => serializer.serialize_none(),
277    }
278}
279
280fn serialize_ident<S: serde::Serializer>(
281    ident: &syn::Ident,
282    serializer: S,
283) -> Result<S::Ok, S::Error> {
284    serializer.serialize_str(&ident.to_string())
285}
286
287pub enum DebugInstantiate {
288    Building,
289    Finalized(Box<DebugInstantiateFinalized>),
290}
291
292impl serde::Serialize for DebugInstantiate {
293    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
294        match self {
295            DebugInstantiate::Building => {
296                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
297            }
298            DebugInstantiate::Finalized(_) => {
299                panic!(
300                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
301                )
302            }
303        }
304    }
305}
306
307#[cfg_attr(
308    not(feature = "build"),
309    expect(
310        dead_code,
311        reason = "sink, source unused without `feature = \"build\"`."
312    )
313)]
314pub struct DebugInstantiateFinalized {
315    sink: syn::Expr,
316    source: syn::Expr,
317    connect_fn: Option<Box<dyn FnOnce()>>,
318}
319
320impl From<DebugInstantiateFinalized> for DebugInstantiate {
321    fn from(f: DebugInstantiateFinalized) -> Self {
322        Self::Finalized(Box::new(f))
323    }
324}
325
326impl Debug for DebugInstantiate {
327    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
328        write!(f, "<network instantiate>")
329    }
330}
331
332impl Hash for DebugInstantiate {
333    fn hash<H: Hasher>(&self, _state: &mut H) {
334        // Do nothing
335    }
336}
337
338impl Clone for DebugInstantiate {
339    fn clone(&self) -> Self {
340        match self {
341            DebugInstantiate::Building => DebugInstantiate::Building,
342            DebugInstantiate::Finalized(_) => {
343                panic!("DebugInstantiate::Finalized should not be cloned")
344            }
345        }
346    }
347}
348
349/// Tracks the instantiation state of a `ClusterMembers` source.
350///
351/// During `compile_network`, the first `ClusterMembers` node for a given
352/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
353/// receives the expression returned by `Deploy::cluster_membership_stream`.
354/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
355/// during code-gen they simply reference the tee output of the first node
356/// instead of creating a redundant `source_stream`.
357#[derive(Debug, Hash, Clone, serde::Serialize)]
358pub enum ClusterMembersState {
359    /// Not yet instantiated.
360    Uninit,
361    /// The primary instance: holds the stream expression and will emit
362    /// `source_stream(expr) -> tee()` during code-gen.
363    Stream(DebugExpr),
364    /// A secondary instance that references the tee output of the primary.
365    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
366    /// can derive the deterministic tee ident without extra state.
367    Tee(LocationId, LocationId),
368}
369
370/// A source in a Hydro graph, where data enters the graph.
371#[derive(Debug, Hash, Clone, serde::Serialize)]
372pub enum HydroSource {
373    Stream(DebugExpr),
374    ExternalNetwork(),
375    Iter(DebugExpr),
376    Spin(),
377    ClusterMembers(LocationId, ClusterMembersState),
378    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
379    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
380}
381
382#[cfg(feature = "build")]
383/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
384/// and simulations.
385///
386/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
387/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
388pub trait DfirBuilder {
389    /// Whether the representation of singletons should include intermediate states.
390    fn singleton_intermediates(&self) -> bool;
391
392    /// Gets the DFIR builder for the given location, creating it if necessary.
393    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
394
395    fn batch(
396        &mut self,
397        in_ident: syn::Ident,
398        in_location: &LocationId,
399        in_kind: &CollectionKind,
400        out_ident: &syn::Ident,
401        out_location: &LocationId,
402        op_meta: &HydroIrOpMetadata,
403    );
404    fn yield_from_tick(
405        &mut self,
406        in_ident: syn::Ident,
407        in_location: &LocationId,
408        in_kind: &CollectionKind,
409        out_ident: &syn::Ident,
410        out_location: &LocationId,
411    );
412
413    fn begin_atomic(
414        &mut self,
415        in_ident: syn::Ident,
416        in_location: &LocationId,
417        in_kind: &CollectionKind,
418        out_ident: &syn::Ident,
419        out_location: &LocationId,
420        op_meta: &HydroIrOpMetadata,
421    );
422    fn end_atomic(
423        &mut self,
424        in_ident: syn::Ident,
425        in_location: &LocationId,
426        in_kind: &CollectionKind,
427        out_ident: &syn::Ident,
428    );
429
430    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
431    fn observe_nondet(
432        &mut self,
433        trusted: bool,
434        location: &LocationId,
435        in_ident: syn::Ident,
436        in_kind: &CollectionKind,
437        out_ident: &syn::Ident,
438        out_kind: &CollectionKind,
439        op_meta: &HydroIrOpMetadata,
440    );
441
442    #[expect(clippy::too_many_arguments, reason = "TODO")]
443    fn merge_ordered(
444        &mut self,
445        location: &LocationId,
446        first_ident: syn::Ident,
447        second_ident: syn::Ident,
448        out_ident: &syn::Ident,
449        in_kind: &CollectionKind,
450        op_meta: &HydroIrOpMetadata,
451        operator_tag: Option<&str>,
452    );
453
454    #[expect(clippy::too_many_arguments, reason = "TODO")]
455    fn create_network(
456        &mut self,
457        from: &LocationId,
458        to: &LocationId,
459        input_ident: syn::Ident,
460        out_ident: &syn::Ident,
461        serialize: Option<&DebugExpr>,
462        sink: syn::Expr,
463        source: syn::Expr,
464        deserialize: Option<&DebugExpr>,
465        tag_id: usize,
466        networking_info: &crate::networking::NetworkingInfo,
467    );
468
469    fn create_external_source(
470        &mut self,
471        on: &LocationId,
472        source_expr: syn::Expr,
473        out_ident: &syn::Ident,
474        deserialize: Option<&DebugExpr>,
475        tag_id: usize,
476    );
477
478    fn create_external_output(
479        &mut self,
480        on: &LocationId,
481        sink_expr: syn::Expr,
482        input_ident: &syn::Ident,
483        serialize: Option<&DebugExpr>,
484        tag_id: usize,
485    );
486}
487
488#[cfg(feature = "build")]
489impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
490    fn singleton_intermediates(&self) -> bool {
491        false
492    }
493
494    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
495        self.entry(location.root().key())
496            .expect("location was removed")
497            .or_default()
498    }
499
500    fn batch(
501        &mut self,
502        in_ident: syn::Ident,
503        in_location: &LocationId,
504        in_kind: &CollectionKind,
505        out_ident: &syn::Ident,
506        _out_location: &LocationId,
507        _op_meta: &HydroIrOpMetadata,
508    ) {
509        let builder = self.get_dfir_mut(in_location.root());
510        if in_kind.is_bounded()
511            && matches!(
512                in_kind,
513                CollectionKind::Singleton { .. }
514                    | CollectionKind::Optional { .. }
515                    | CollectionKind::KeyedSingleton { .. }
516            )
517        {
518            assert!(in_location.is_top_level());
519            builder.add_dfir(
520                parse_quote! {
521                    #out_ident = #in_ident -> persist::<'static>();
522                },
523                None,
524                None,
525            );
526        } else {
527            builder.add_dfir(
528                parse_quote! {
529                    #out_ident = #in_ident;
530                },
531                None,
532                None,
533            );
534        }
535    }
536
537    fn yield_from_tick(
538        &mut self,
539        in_ident: syn::Ident,
540        in_location: &LocationId,
541        _in_kind: &CollectionKind,
542        out_ident: &syn::Ident,
543        _out_location: &LocationId,
544    ) {
545        let builder = self.get_dfir_mut(in_location.root());
546        builder.add_dfir(
547            parse_quote! {
548                #out_ident = #in_ident;
549            },
550            None,
551            None,
552        );
553    }
554
555    fn begin_atomic(
556        &mut self,
557        in_ident: syn::Ident,
558        in_location: &LocationId,
559        _in_kind: &CollectionKind,
560        out_ident: &syn::Ident,
561        _out_location: &LocationId,
562        _op_meta: &HydroIrOpMetadata,
563    ) {
564        let builder = self.get_dfir_mut(in_location.root());
565        builder.add_dfir(
566            parse_quote! {
567                #out_ident = #in_ident;
568            },
569            None,
570            None,
571        );
572    }
573
574    fn end_atomic(
575        &mut self,
576        in_ident: syn::Ident,
577        in_location: &LocationId,
578        _in_kind: &CollectionKind,
579        out_ident: &syn::Ident,
580    ) {
581        let builder = self.get_dfir_mut(in_location.root());
582        builder.add_dfir(
583            parse_quote! {
584                #out_ident = #in_ident;
585            },
586            None,
587            None,
588        );
589    }
590
591    fn observe_nondet(
592        &mut self,
593        _trusted: bool,
594        location: &LocationId,
595        in_ident: syn::Ident,
596        _in_kind: &CollectionKind,
597        out_ident: &syn::Ident,
598        _out_kind: &CollectionKind,
599        _op_meta: &HydroIrOpMetadata,
600    ) {
601        let builder = self.get_dfir_mut(location);
602        builder.add_dfir(
603            parse_quote! {
604                #out_ident = #in_ident;
605            },
606            None,
607            None,
608        );
609    }
610
611    fn merge_ordered(
612        &mut self,
613        location: &LocationId,
614        first_ident: syn::Ident,
615        second_ident: syn::Ident,
616        out_ident: &syn::Ident,
617        _in_kind: &CollectionKind,
618        _op_meta: &HydroIrOpMetadata,
619        operator_tag: Option<&str>,
620    ) {
621        let builder = self.get_dfir_mut(location);
622        builder.add_dfir(
623            parse_quote! {
624                #out_ident = union();
625                #first_ident -> [0]#out_ident;
626                #second_ident -> [1]#out_ident;
627            },
628            None,
629            operator_tag,
630        );
631    }
632
633    fn create_network(
634        &mut self,
635        from: &LocationId,
636        to: &LocationId,
637        input_ident: syn::Ident,
638        out_ident: &syn::Ident,
639        serialize: Option<&DebugExpr>,
640        sink: syn::Expr,
641        source: syn::Expr,
642        deserialize: Option<&DebugExpr>,
643        tag_id: usize,
644        _networking_info: &crate::networking::NetworkingInfo,
645    ) {
646        let sender_builder = self.get_dfir_mut(from);
647        if let Some(serialize_pipeline) = serialize {
648            sender_builder.add_dfir(
649                parse_quote! {
650                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
651                },
652                None,
653                // operator tag separates send and receive, which otherwise have the same next_stmt_id
654                Some(&format!("send{}", tag_id)),
655            );
656        } else {
657            sender_builder.add_dfir(
658                parse_quote! {
659                    #input_ident -> dest_sink(#sink);
660                },
661                None,
662                Some(&format!("send{}", tag_id)),
663            );
664        }
665
666        let receiver_builder = self.get_dfir_mut(to);
667        if let Some(deserialize_pipeline) = deserialize {
668            receiver_builder.add_dfir(
669                parse_quote! {
670                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
671                },
672                None,
673                Some(&format!("recv{}", tag_id)),
674            );
675        } else {
676            receiver_builder.add_dfir(
677                parse_quote! {
678                    #out_ident = source_stream(#source);
679                },
680                None,
681                Some(&format!("recv{}", tag_id)),
682            );
683        }
684    }
685
686    fn create_external_source(
687        &mut self,
688        on: &LocationId,
689        source_expr: syn::Expr,
690        out_ident: &syn::Ident,
691        deserialize: Option<&DebugExpr>,
692        tag_id: usize,
693    ) {
694        let receiver_builder = self.get_dfir_mut(on);
695        if let Some(deserialize_pipeline) = deserialize {
696            receiver_builder.add_dfir(
697                parse_quote! {
698                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
699                },
700                None,
701                Some(&format!("recv{}", tag_id)),
702            );
703        } else {
704            receiver_builder.add_dfir(
705                parse_quote! {
706                    #out_ident = source_stream(#source_expr);
707                },
708                None,
709                Some(&format!("recv{}", tag_id)),
710            );
711        }
712    }
713
714    fn create_external_output(
715        &mut self,
716        on: &LocationId,
717        sink_expr: syn::Expr,
718        input_ident: &syn::Ident,
719        serialize: Option<&DebugExpr>,
720        tag_id: usize,
721    ) {
722        let sender_builder = self.get_dfir_mut(on);
723        if let Some(serialize_fn) = serialize {
724            sender_builder.add_dfir(
725                parse_quote! {
726                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
727                },
728                None,
729                // operator tag separates send and receive, which otherwise have the same next_stmt_id
730                Some(&format!("send{}", tag_id)),
731            );
732        } else {
733            sender_builder.add_dfir(
734                parse_quote! {
735                    #input_ident -> dest_sink(#sink_expr);
736                },
737                None,
738                Some(&format!("send{}", tag_id)),
739            );
740        }
741    }
742}
743
744#[cfg(feature = "build")]
745pub enum BuildersOrCallback<'a, L, N>
746where
747    L: FnMut(&mut HydroRoot, &mut usize),
748    N: FnMut(&mut HydroNode, &mut usize),
749{
750    Builders(&'a mut dyn DfirBuilder),
751    Callback(L, N),
752}
753
754/// An root in a Hydro graph, which is an pipeline that doesn't emit
755/// any downstream values. Traversals over the dataflow graph and
756/// generating DFIR IR start from roots.
757#[derive(Debug, Hash, serde::Serialize)]
758pub enum HydroRoot {
759    ForEach {
760        f: DebugExpr,
761        input: Box<HydroNode>,
762        op_metadata: HydroIrOpMetadata,
763    },
764    SendExternal {
765        to_external_key: LocationKey,
766        to_port_id: ExternalPortId,
767        to_many: bool,
768        unpaired: bool,
769        serialize_fn: Option<DebugExpr>,
770        instantiate_fn: DebugInstantiate,
771        input: Box<HydroNode>,
772        op_metadata: HydroIrOpMetadata,
773    },
774    DestSink {
775        sink: DebugExpr,
776        input: Box<HydroNode>,
777        op_metadata: HydroIrOpMetadata,
778    },
779    CycleSink {
780        cycle_id: CycleId,
781        input: Box<HydroNode>,
782        op_metadata: HydroIrOpMetadata,
783    },
784    EmbeddedOutput {
785        #[serde(serialize_with = "serialize_ident")]
786        ident: syn::Ident,
787        input: Box<HydroNode>,
788        op_metadata: HydroIrOpMetadata,
789    },
790    Null {
791        input: Box<HydroNode>,
792        op_metadata: HydroIrOpMetadata,
793    },
794}
795
796impl HydroRoot {
797    #[cfg(feature = "build")]
798    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
799    pub fn compile_network<'a, D>(
800        &mut self,
801        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
802        seen_tees: &mut SeenSharedNodes,
803        seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
804        processes: &SparseSecondaryMap<LocationKey, D::Process>,
805        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
806        externals: &SparseSecondaryMap<LocationKey, D::External>,
807        env: &mut D::InstantiateEnv,
808    ) where
809        D: Deploy<'a>,
810    {
811        let refcell_extra_stmts = RefCell::new(extra_stmts);
812        let refcell_env = RefCell::new(env);
813        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
814        self.transform_bottom_up(
815            &mut |l| {
816                if let HydroRoot::SendExternal {
817                    input,
818                    to_external_key,
819                    to_port_id,
820                    to_many,
821                    unpaired,
822                    instantiate_fn,
823                    ..
824                } = l
825                {
826                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
827                        DebugInstantiate::Building => {
828                            let to_node = externals
829                                .get(*to_external_key)
830                                .unwrap_or_else(|| {
831                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
832                                })
833                                .clone();
834
835                            match input.metadata().location_id.root() {
836                                &LocationId::Process(process_key) => {
837                                    if *to_many {
838                                        (
839                                            (
840                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
841                                                parse_quote!(DUMMY),
842                                            ),
843                                            Box::new(|| {}) as Box<dyn FnOnce()>,
844                                        )
845                                    } else {
846                                        let from_node = processes
847                                            .get(process_key)
848                                            .unwrap_or_else(|| {
849                                                panic!("A process used in the graph was not instantiated: {}", process_key)
850                                            })
851                                            .clone();
852
853                                        let sink_port = from_node.next_port();
854                                        let source_port = to_node.next_port();
855
856                                        if *unpaired {
857                                            use stageleft::quote_type;
858                                            use tokio_util::codec::LengthDelimitedCodec;
859
860                                            to_node.register(*to_port_id, source_port.clone());
861
862                                            let _ = D::e2o_source(
863                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
864                                                &to_node, &source_port,
865                                                &from_node, &sink_port,
866                                                &quote_type::<LengthDelimitedCodec>(),
867                                                format!("{}_{}", *to_external_key, *to_port_id)
868                                            );
869                                        }
870
871                                        (
872                                            (
873                                                D::o2e_sink(
874                                                    &from_node,
875                                                    &sink_port,
876                                                    &to_node,
877                                                    &source_port,
878                                                    format!("{}_{}", *to_external_key, *to_port_id)
879                                                ),
880                                                parse_quote!(DUMMY),
881                                            ),
882                                            if *unpaired {
883                                                D::e2o_connect(
884                                                    &to_node,
885                                                    &source_port,
886                                                    &from_node,
887                                                    &sink_port,
888                                                    *to_many,
889                                                    NetworkHint::Auto,
890                                                )
891                                            } else {
892                                                Box::new(|| {}) as Box<dyn FnOnce()>
893                                            },
894                                        )
895                                    }
896                                }
897                                LocationId::Cluster(cluster_key) => {
898                                    let from_node = clusters
899                                        .get(*cluster_key)
900                                        .unwrap_or_else(|| {
901                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
902                                        })
903                                        .clone();
904
905                                    let sink_port = from_node.next_port();
906                                    let source_port = to_node.next_port();
907
908                                    if *unpaired {
909                                        to_node.register(*to_port_id, source_port.clone());
910                                    }
911
912                                    (
913                                        (
914                                            D::m2e_sink(
915                                                &from_node,
916                                                &sink_port,
917                                                &to_node,
918                                                &source_port,
919                                                format!("{}_{}", *to_external_key, *to_port_id)
920                                            ),
921                                            parse_quote!(DUMMY),
922                                        ),
923                                        Box::new(|| {}) as Box<dyn FnOnce()>,
924                                    )
925                                }
926                                _ => panic!()
927                            }
928                        },
929
930                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
931                    };
932
933                    *instantiate_fn = DebugInstantiateFinalized {
934                        sink: sink_expr,
935                        source: source_expr,
936                        connect_fn: Some(connect_fn),
937                    }
938                    .into();
939                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
940                    let element_type = match &input.metadata().collection_kind {
941                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
942                        _ => panic!("Embedded output must have Stream collection kind"),
943                    };
944                    let location_key = match input.metadata().location_id.root() {
945                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
946                        _ => panic!("Embedded output must be on a process or cluster"),
947                    };
948                    D::register_embedded_output(
949                        &mut refcell_env.borrow_mut(),
950                        location_key,
951                        ident,
952                        &element_type,
953                    );
954                }
955            },
956            &mut |n| {
957                if let HydroNode::Network {
958                    name,
959                    networking_info,
960                    input,
961                    instantiate_fn,
962                    metadata,
963                    ..
964                } = n
965                {
966                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
967                        DebugInstantiate::Building => instantiate_network::<D>(
968                            &mut refcell_env.borrow_mut(),
969                            input.metadata().location_id.root(),
970                            metadata.location_id.root(),
971                            processes,
972                            clusters,
973                            name.as_deref(),
974                            networking_info,
975                        ),
976
977                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
978                    };
979
980                    *instantiate_fn = DebugInstantiateFinalized {
981                        sink: sink_expr,
982                        source: source_expr,
983                        connect_fn: Some(connect_fn),
984                    }
985                    .into();
986                } else if let HydroNode::ExternalInput {
987                    from_external_key,
988                    from_port_id,
989                    from_many,
990                    codec_type,
991                    port_hint,
992                    instantiate_fn,
993                    metadata,
994                    ..
995                } = n
996                {
997                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
998                        DebugInstantiate::Building => {
999                            let from_node = externals
1000                                .get(*from_external_key)
1001                                .unwrap_or_else(|| {
1002                                    panic!(
1003                                        "A external used in the graph was not instantiated: {}",
1004                                        from_external_key,
1005                                    )
1006                                })
1007                                .clone();
1008
1009                            match metadata.location_id.root() {
1010                                &LocationId::Process(process_key) => {
1011                                    let to_node = processes
1012                                        .get(process_key)
1013                                        .unwrap_or_else(|| {
1014                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1015                                        })
1016                                        .clone();
1017
1018                                    let sink_port = from_node.next_port();
1019                                    let source_port = to_node.next_port();
1020
1021                                    from_node.register(*from_port_id, sink_port.clone());
1022
1023                                    (
1024                                        (
1025                                            parse_quote!(DUMMY),
1026                                            if *from_many {
1027                                                D::e2o_many_source(
1028                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1029                                                    &to_node, &source_port,
1030                                                    codec_type.0.as_ref(),
1031                                                    format!("{}_{}", *from_external_key, *from_port_id)
1032                                                )
1033                                            } else {
1034                                                D::e2o_source(
1035                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1036                                                    &from_node, &sink_port,
1037                                                    &to_node, &source_port,
1038                                                    codec_type.0.as_ref(),
1039                                                    format!("{}_{}", *from_external_key, *from_port_id)
1040                                                )
1041                                            },
1042                                        ),
1043                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1044                                    )
1045                                }
1046                                LocationId::Cluster(cluster_key) => {
1047                                    let to_node = clusters
1048                                        .get(*cluster_key)
1049                                        .unwrap_or_else(|| {
1050                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1051                                        })
1052                                        .clone();
1053
1054                                    let sink_port = from_node.next_port();
1055                                    let source_port = to_node.next_port();
1056
1057                                    from_node.register(*from_port_id, sink_port.clone());
1058
1059                                    (
1060                                        (
1061                                            parse_quote!(DUMMY),
1062                                            D::e2m_source(
1063                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1064                                                &from_node, &sink_port,
1065                                                &to_node, &source_port,
1066                                                codec_type.0.as_ref(),
1067                                                format!("{}_{}", *from_external_key, *from_port_id)
1068                                            ),
1069                                        ),
1070                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1071                                    )
1072                                }
1073                                _ => panic!()
1074                            }
1075                        },
1076
1077                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1078                    };
1079
1080                    *instantiate_fn = DebugInstantiateFinalized {
1081                        sink: sink_expr,
1082                        source: source_expr,
1083                        connect_fn: Some(connect_fn),
1084                    }
1085                    .into();
1086                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1087                    let element_type = match &metadata.collection_kind {
1088                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1089                        _ => panic!("Embedded source must have Stream collection kind"),
1090                    };
1091                    let location_key = match metadata.location_id.root() {
1092                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1093                        _ => panic!("Embedded source must be on a process or cluster"),
1094                    };
1095                    D::register_embedded_stream_input(
1096                        &mut refcell_env.borrow_mut(),
1097                        location_key,
1098                        ident,
1099                        &element_type,
1100                    );
1101                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1102                    let element_type = match &metadata.collection_kind {
1103                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1104                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1105                    };
1106                    let location_key = match metadata.location_id.root() {
1107                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1108                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1109                    };
1110                    D::register_embedded_singleton_input(
1111                        &mut refcell_env.borrow_mut(),
1112                        location_key,
1113                        ident,
1114                        &element_type,
1115                    );
1116                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1117                    match state {
1118                        ClusterMembersState::Uninit => {
1119                            let at_location = metadata.location_id.root().clone();
1120                            let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1121                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1122                                // First occurrence: call cluster_membership_stream and mark as Stream.
1123                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1124                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1125                                    &(),
1126                                );
1127                                *state = ClusterMembersState::Stream(expr.into());
1128                            } else {
1129                                // Already instantiated for this (at, target) pair: just tee.
1130                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1131                            }
1132                        }
1133                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1134                            panic!("cluster members already finalized");
1135                        }
1136                    }
1137                }
1138            },
1139            seen_tees,
1140            false,
1141        );
1142    }
1143
1144    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1145        self.transform_bottom_up(
1146            &mut |l| {
1147                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1148                    match instantiate_fn {
1149                        DebugInstantiate::Building => panic!("network not built"),
1150
1151                        DebugInstantiate::Finalized(finalized) => {
1152                            (finalized.connect_fn.take().unwrap())();
1153                        }
1154                    }
1155                }
1156            },
1157            &mut |n| {
1158                if let HydroNode::Network { instantiate_fn, .. }
1159                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1160                {
1161                    match instantiate_fn {
1162                        DebugInstantiate::Building => panic!("network not built"),
1163
1164                        DebugInstantiate::Finalized(finalized) => {
1165                            (finalized.connect_fn.take().unwrap())();
1166                        }
1167                    }
1168                }
1169            },
1170            seen_tees,
1171            false,
1172        );
1173    }
1174
1175    pub fn transform_bottom_up(
1176        &mut self,
1177        transform_root: &mut impl FnMut(&mut HydroRoot),
1178        transform_node: &mut impl FnMut(&mut HydroNode),
1179        seen_tees: &mut SeenSharedNodes,
1180        check_well_formed: bool,
1181    ) {
1182        self.transform_children(
1183            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1184            seen_tees,
1185        );
1186
1187        transform_root(self);
1188    }
1189
1190    pub fn transform_children(
1191        &mut self,
1192        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1193        seen_tees: &mut SeenSharedNodes,
1194    ) {
1195        match self {
1196            HydroRoot::ForEach { input, .. }
1197            | HydroRoot::SendExternal { input, .. }
1198            | HydroRoot::DestSink { input, .. }
1199            | HydroRoot::CycleSink { input, .. }
1200            | HydroRoot::EmbeddedOutput { input, .. }
1201            | HydroRoot::Null { input, .. } => {
1202                transform(input, seen_tees);
1203            }
1204        }
1205    }
1206
1207    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1208        match self {
1209            HydroRoot::ForEach {
1210                f,
1211                input,
1212                op_metadata,
1213            } => HydroRoot::ForEach {
1214                f: f.clone(),
1215                input: Box::new(input.deep_clone(seen_tees)),
1216                op_metadata: op_metadata.clone(),
1217            },
1218            HydroRoot::SendExternal {
1219                to_external_key,
1220                to_port_id,
1221                to_many,
1222                unpaired,
1223                serialize_fn,
1224                instantiate_fn,
1225                input,
1226                op_metadata,
1227            } => HydroRoot::SendExternal {
1228                to_external_key: *to_external_key,
1229                to_port_id: *to_port_id,
1230                to_many: *to_many,
1231                unpaired: *unpaired,
1232                serialize_fn: serialize_fn.clone(),
1233                instantiate_fn: instantiate_fn.clone(),
1234                input: Box::new(input.deep_clone(seen_tees)),
1235                op_metadata: op_metadata.clone(),
1236            },
1237            HydroRoot::DestSink {
1238                sink,
1239                input,
1240                op_metadata,
1241            } => HydroRoot::DestSink {
1242                sink: sink.clone(),
1243                input: Box::new(input.deep_clone(seen_tees)),
1244                op_metadata: op_metadata.clone(),
1245            },
1246            HydroRoot::CycleSink {
1247                cycle_id,
1248                input,
1249                op_metadata,
1250            } => HydroRoot::CycleSink {
1251                cycle_id: *cycle_id,
1252                input: Box::new(input.deep_clone(seen_tees)),
1253                op_metadata: op_metadata.clone(),
1254            },
1255            HydroRoot::EmbeddedOutput {
1256                ident,
1257                input,
1258                op_metadata,
1259            } => HydroRoot::EmbeddedOutput {
1260                ident: ident.clone(),
1261                input: Box::new(input.deep_clone(seen_tees)),
1262                op_metadata: op_metadata.clone(),
1263            },
1264            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1265                input: Box::new(input.deep_clone(seen_tees)),
1266                op_metadata: op_metadata.clone(),
1267            },
1268        }
1269    }
1270
1271    #[cfg(feature = "build")]
1272    pub fn emit(
1273        &mut self,
1274        graph_builders: &mut dyn DfirBuilder,
1275        seen_tees: &mut SeenSharedNodes,
1276        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1277        next_stmt_id: &mut usize,
1278    ) {
1279        self.emit_core(
1280            &mut BuildersOrCallback::<
1281                fn(&mut HydroRoot, &mut usize),
1282                fn(&mut HydroNode, &mut usize),
1283            >::Builders(graph_builders),
1284            seen_tees,
1285            built_tees,
1286            next_stmt_id,
1287        );
1288    }
1289
1290    #[cfg(feature = "build")]
1291    pub fn emit_core(
1292        &mut self,
1293        builders_or_callback: &mut BuildersOrCallback<
1294            impl FnMut(&mut HydroRoot, &mut usize),
1295            impl FnMut(&mut HydroNode, &mut usize),
1296        >,
1297        seen_tees: &mut SeenSharedNodes,
1298        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1299        next_stmt_id: &mut usize,
1300    ) {
1301        match self {
1302            HydroRoot::ForEach { f, input, .. } => {
1303                let input_ident =
1304                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1305
1306                match builders_or_callback {
1307                    BuildersOrCallback::Builders(graph_builders) => {
1308                        graph_builders
1309                            .get_dfir_mut(&input.metadata().location_id)
1310                            .add_dfir(
1311                                parse_quote! {
1312                                    #input_ident -> for_each(#f);
1313                                },
1314                                None,
1315                                Some(&next_stmt_id.to_string()),
1316                            );
1317                    }
1318                    BuildersOrCallback::Callback(leaf_callback, _) => {
1319                        leaf_callback(self, next_stmt_id);
1320                    }
1321                }
1322
1323                *next_stmt_id += 1;
1324            }
1325
1326            HydroRoot::SendExternal {
1327                serialize_fn,
1328                instantiate_fn,
1329                input,
1330                ..
1331            } => {
1332                let input_ident =
1333                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1334
1335                match builders_or_callback {
1336                    BuildersOrCallback::Builders(graph_builders) => {
1337                        let (sink_expr, _) = match instantiate_fn {
1338                            DebugInstantiate::Building => (
1339                                syn::parse_quote!(DUMMY_SINK),
1340                                syn::parse_quote!(DUMMY_SOURCE),
1341                            ),
1342
1343                            DebugInstantiate::Finalized(finalized) => {
1344                                (finalized.sink.clone(), finalized.source.clone())
1345                            }
1346                        };
1347
1348                        graph_builders.create_external_output(
1349                            &input.metadata().location_id,
1350                            sink_expr,
1351                            &input_ident,
1352                            serialize_fn.as_ref(),
1353                            *next_stmt_id,
1354                        );
1355                    }
1356                    BuildersOrCallback::Callback(leaf_callback, _) => {
1357                        leaf_callback(self, next_stmt_id);
1358                    }
1359                }
1360
1361                *next_stmt_id += 1;
1362            }
1363
1364            HydroRoot::DestSink { sink, input, .. } => {
1365                let input_ident =
1366                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1367
1368                match builders_or_callback {
1369                    BuildersOrCallback::Builders(graph_builders) => {
1370                        graph_builders
1371                            .get_dfir_mut(&input.metadata().location_id)
1372                            .add_dfir(
1373                                parse_quote! {
1374                                    #input_ident -> dest_sink(#sink);
1375                                },
1376                                None,
1377                                Some(&next_stmt_id.to_string()),
1378                            );
1379                    }
1380                    BuildersOrCallback::Callback(leaf_callback, _) => {
1381                        leaf_callback(self, next_stmt_id);
1382                    }
1383                }
1384
1385                *next_stmt_id += 1;
1386            }
1387
1388            HydroRoot::CycleSink {
1389                cycle_id, input, ..
1390            } => {
1391                let input_ident =
1392                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1393
1394                match builders_or_callback {
1395                    BuildersOrCallback::Builders(graph_builders) => {
1396                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1397                            CollectionKind::KeyedSingleton {
1398                                key_type,
1399                                value_type,
1400                                ..
1401                            }
1402                            | CollectionKind::KeyedStream {
1403                                key_type,
1404                                value_type,
1405                                ..
1406                            } => {
1407                                parse_quote!((#key_type, #value_type))
1408                            }
1409                            CollectionKind::Stream { element_type, .. }
1410                            | CollectionKind::Singleton { element_type, .. }
1411                            | CollectionKind::Optional { element_type, .. } => {
1412                                parse_quote!(#element_type)
1413                            }
1414                        };
1415
1416                        let cycle_id_ident = cycle_id.as_ident();
1417                        graph_builders
1418                            .get_dfir_mut(&input.metadata().location_id)
1419                            .add_dfir(
1420                                parse_quote! {
1421                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1422                                },
1423                                None,
1424                                None,
1425                            );
1426                    }
1427                    // No ID, no callback
1428                    BuildersOrCallback::Callback(_, _) => {}
1429                }
1430            }
1431
1432            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1433                let input_ident =
1434                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1435
1436                match builders_or_callback {
1437                    BuildersOrCallback::Builders(graph_builders) => {
1438                        graph_builders
1439                            .get_dfir_mut(&input.metadata().location_id)
1440                            .add_dfir(
1441                                parse_quote! {
1442                                    #input_ident -> for_each(&mut #ident);
1443                                },
1444                                None,
1445                                Some(&next_stmt_id.to_string()),
1446                            );
1447                    }
1448                    BuildersOrCallback::Callback(leaf_callback, _) => {
1449                        leaf_callback(self, next_stmt_id);
1450                    }
1451                }
1452
1453                *next_stmt_id += 1;
1454            }
1455
1456            HydroRoot::Null { input, .. } => {
1457                let input_ident =
1458                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1459
1460                match builders_or_callback {
1461                    BuildersOrCallback::Builders(graph_builders) => {
1462                        graph_builders
1463                            .get_dfir_mut(&input.metadata().location_id)
1464                            .add_dfir(
1465                                parse_quote! {
1466                                    #input_ident -> for_each(|_| {});
1467                                },
1468                                None,
1469                                Some(&next_stmt_id.to_string()),
1470                            );
1471                    }
1472                    BuildersOrCallback::Callback(leaf_callback, _) => {
1473                        leaf_callback(self, next_stmt_id);
1474                    }
1475                }
1476
1477                *next_stmt_id += 1;
1478            }
1479        }
1480    }
1481
1482    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1483        match self {
1484            HydroRoot::ForEach { op_metadata, .. }
1485            | HydroRoot::SendExternal { op_metadata, .. }
1486            | HydroRoot::DestSink { op_metadata, .. }
1487            | HydroRoot::CycleSink { op_metadata, .. }
1488            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1489            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1490        }
1491    }
1492
1493    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1494        match self {
1495            HydroRoot::ForEach { op_metadata, .. }
1496            | HydroRoot::SendExternal { op_metadata, .. }
1497            | HydroRoot::DestSink { op_metadata, .. }
1498            | HydroRoot::CycleSink { op_metadata, .. }
1499            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1500            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1501        }
1502    }
1503
1504    pub fn input(&self) -> &HydroNode {
1505        match self {
1506            HydroRoot::ForEach { input, .. }
1507            | HydroRoot::SendExternal { input, .. }
1508            | HydroRoot::DestSink { input, .. }
1509            | HydroRoot::CycleSink { input, .. }
1510            | HydroRoot::EmbeddedOutput { input, .. }
1511            | HydroRoot::Null { input, .. } => input,
1512        }
1513    }
1514
1515    pub fn input_metadata(&self) -> &HydroIrMetadata {
1516        self.input().metadata()
1517    }
1518
1519    pub fn print_root(&self) -> String {
1520        match self {
1521            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1522            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1523            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1524            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1525            HydroRoot::EmbeddedOutput { ident, .. } => {
1526                format!("EmbeddedOutput({})", ident)
1527            }
1528            HydroRoot::Null { .. } => "Null".to_owned(),
1529        }
1530    }
1531
1532    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1533        match self {
1534            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1535                transform(f);
1536            }
1537            HydroRoot::SendExternal { .. }
1538            | HydroRoot::CycleSink { .. }
1539            | HydroRoot::EmbeddedOutput { .. }
1540            | HydroRoot::Null { .. } => {}
1541        }
1542    }
1543}
1544
1545#[cfg(feature = "build")]
1546fn tick_of(loc: &LocationId) -> Option<ClockId> {
1547    match loc {
1548        LocationId::Tick(id, _) => Some(*id),
1549        LocationId::Atomic(inner) => tick_of(inner),
1550        _ => None,
1551    }
1552}
1553
1554#[cfg(feature = "build")]
1555fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1556    match loc {
1557        LocationId::Tick(id, inner) => {
1558            *id = uf_find(uf, *id);
1559            remap_location(inner, uf);
1560        }
1561        LocationId::Atomic(inner) => {
1562            remap_location(inner, uf);
1563        }
1564        LocationId::Process(_) | LocationId::Cluster(_) => {}
1565    }
1566}
1567
1568#[cfg(feature = "build")]
1569fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1570    let p = *parent.get(&x).unwrap_or(&x);
1571    if p == x {
1572        return x;
1573    }
1574    let root = uf_find(parent, p);
1575    parent.insert(x, root);
1576    root
1577}
1578
1579#[cfg(feature = "build")]
1580fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1581    let ra = uf_find(parent, a);
1582    let rb = uf_find(parent, b);
1583    if ra != rb {
1584        parent.insert(ra, rb);
1585    }
1586}
1587
1588/// Traverse the IR to build a union-find that unifies tick IDs connected
1589/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1590/// rewrite all `LocationId`s to use the representative tick ID.
1591#[cfg(feature = "build")]
1592pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1593    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1594
1595    // Pass 1: collect unifications.
1596    transform_bottom_up(
1597        ir,
1598        &mut |_| {},
1599        &mut |node: &mut HydroNode| {
1600            if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1601                node
1602                && let (Some(a), Some(b)) = (
1603                    tick_of(&inner.metadata().location_id),
1604                    tick_of(&metadata.location_id),
1605                )
1606            {
1607                uf_union(&mut uf, a, b);
1608            }
1609        },
1610        false,
1611    );
1612
1613    // Pass 2: rewrite all LocationIds.
1614    transform_bottom_up(
1615        ir,
1616        &mut |_| {},
1617        &mut |node: &mut HydroNode| {
1618            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1619        },
1620        false,
1621    );
1622}
1623
1624#[cfg(feature = "build")]
1625pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1626    let mut builders = SecondaryMap::new();
1627    let mut seen_tees = HashMap::new();
1628    let mut built_tees = HashMap::new();
1629    let mut next_stmt_id = 0;
1630    for leaf in ir {
1631        leaf.emit(
1632            &mut builders,
1633            &mut seen_tees,
1634            &mut built_tees,
1635            &mut next_stmt_id,
1636        );
1637    }
1638    builders
1639}
1640
1641#[cfg(feature = "build")]
1642pub fn traverse_dfir(
1643    ir: &mut [HydroRoot],
1644    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1645    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1646) {
1647    let mut seen_tees = HashMap::new();
1648    let mut built_tees = HashMap::new();
1649    let mut next_stmt_id = 0;
1650    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1651    ir.iter_mut().for_each(|leaf| {
1652        leaf.emit_core(
1653            &mut callback,
1654            &mut seen_tees,
1655            &mut built_tees,
1656            &mut next_stmt_id,
1657        );
1658    });
1659}
1660
1661pub fn transform_bottom_up(
1662    ir: &mut [HydroRoot],
1663    transform_root: &mut impl FnMut(&mut HydroRoot),
1664    transform_node: &mut impl FnMut(&mut HydroNode),
1665    check_well_formed: bool,
1666) {
1667    let mut seen_tees = HashMap::new();
1668    ir.iter_mut().for_each(|leaf| {
1669        leaf.transform_bottom_up(
1670            transform_root,
1671            transform_node,
1672            &mut seen_tees,
1673            check_well_formed,
1674        );
1675    });
1676}
1677
1678pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1679    let mut seen_tees = HashMap::new();
1680    ir.iter()
1681        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1682        .collect()
1683}
1684
1685type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1686thread_local! {
1687    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1688    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1689    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1690    /// on subsequent encounters, preventing infinite loops.
1691    static SERIALIZED_SHARED: PrintedTees
1692        = const { RefCell::new(None) };
1693}
1694
1695pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1696    PRINTED_TEES.with(|printed_tees| {
1697        let mut printed_tees_mut = printed_tees.borrow_mut();
1698        *printed_tees_mut = Some((0, HashMap::new()));
1699        drop(printed_tees_mut);
1700
1701        let ret = f();
1702
1703        let mut printed_tees_mut = printed_tees.borrow_mut();
1704        *printed_tees_mut = None;
1705
1706        ret
1707    })
1708}
1709
1710/// Runs `f` with a fresh shared-node deduplication scope for serialization.
1711/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
1712/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
1713/// back-reference.  The tracking state is restored when `f` returns or panics.
1714pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1715    let _guard = SerializedSharedGuard::enter();
1716    f()
1717}
1718
1719/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
1720/// making `serialize_dedup_shared` re-entrant and panic-safe.
1721struct SerializedSharedGuard {
1722    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1723}
1724
1725impl SerializedSharedGuard {
1726    fn enter() -> Self {
1727        let previous = SERIALIZED_SHARED.with(|cell| {
1728            let mut guard = cell.borrow_mut();
1729            guard.replace((0, HashMap::new()))
1730        });
1731        Self { previous }
1732    }
1733}
1734
1735impl Drop for SerializedSharedGuard {
1736    fn drop(&mut self) {
1737        SERIALIZED_SHARED.with(|cell| {
1738            *cell.borrow_mut() = self.previous.take();
1739        });
1740    }
1741}
1742
1743pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1744
1745impl serde::Serialize for SharedNode {
1746    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
1747    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
1748    /// same subtree every time and, if the graph ever contains a cycle, loop
1749    /// forever.
1750    ///
1751    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
1752    /// integer id.  The first time we see a pointer we assign it the next id and
1753    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
1754    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
1755    /// recursion.  Requires an active `serialize_dedup_shared` scope.
1756    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1757        SERIALIZED_SHARED.with(|cell| {
1758            let mut guard = cell.borrow_mut();
1759            // (next_id, pointer → assigned_id)
1760            let state = guard.as_mut().ok_or_else(|| {
1761                serde::ser::Error::custom(
1762                    "SharedNode serialization requires an active serialize_dedup_shared scope",
1763                )
1764            })?;
1765            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
1766
1767            if let Some(&id) = state.1.get(&ptr) {
1768                drop(guard);
1769                use serde::ser::SerializeMap;
1770                let mut map = serializer.serialize_map(Some(1))?;
1771                map.serialize_entry("$shared_ref", &id)?;
1772                map.end()
1773            } else {
1774                let id = state.0;
1775                state.0 += 1;
1776                state.1.insert(ptr, id);
1777                drop(guard);
1778
1779                use serde::ser::SerializeMap;
1780                let mut map = serializer.serialize_map(Some(2))?;
1781                map.serialize_entry("$shared", &id)?;
1782                map.serialize_entry("node", &*self.0.borrow())?;
1783                map.end()
1784            }
1785        })
1786    }
1787}
1788
1789impl SharedNode {
1790    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1791        Rc::as_ptr(&self.0)
1792    }
1793}
1794
1795impl Debug for SharedNode {
1796    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1797        PRINTED_TEES.with(|printed_tees| {
1798            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1799            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1800
1801            if let Some(printed_tees_mut) = printed_tees_mut {
1802                if let Some(existing) = printed_tees_mut
1803                    .1
1804                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1805                {
1806                    write!(f, "<shared {}>", existing)
1807                } else {
1808                    let next_id = printed_tees_mut.0;
1809                    printed_tees_mut.0 += 1;
1810                    printed_tees_mut
1811                        .1
1812                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1813                    drop(printed_tees_mut_borrow);
1814                    write!(f, "<shared {}>: ", next_id)?;
1815                    Debug::fmt(&self.0.borrow(), f)
1816                }
1817            } else {
1818                drop(printed_tees_mut_borrow);
1819                write!(f, "<shared>: ")?;
1820                Debug::fmt(&self.0.borrow(), f)
1821            }
1822        })
1823    }
1824}
1825
1826impl Hash for SharedNode {
1827    fn hash<H: Hasher>(&self, state: &mut H) {
1828        self.0.borrow_mut().hash(state);
1829    }
1830}
1831
1832#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1833pub enum BoundKind {
1834    Unbounded,
1835    Bounded,
1836}
1837
1838#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1839pub enum StreamOrder {
1840    NoOrder,
1841    TotalOrder,
1842}
1843
1844#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1845pub enum StreamRetry {
1846    AtLeastOnce,
1847    ExactlyOnce,
1848}
1849
1850#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1851pub enum KeyedSingletonBoundKind {
1852    Unbounded,
1853    MonotonicValue,
1854    BoundedValue,
1855    Bounded,
1856}
1857
1858#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1859pub enum SingletonBoundKind {
1860    Unbounded,
1861    Monotonic,
1862    Bounded,
1863}
1864
1865#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
1866pub enum CollectionKind {
1867    Stream {
1868        bound: BoundKind,
1869        order: StreamOrder,
1870        retry: StreamRetry,
1871        element_type: DebugType,
1872    },
1873    Singleton {
1874        bound: SingletonBoundKind,
1875        element_type: DebugType,
1876    },
1877    Optional {
1878        bound: BoundKind,
1879        element_type: DebugType,
1880    },
1881    KeyedStream {
1882        bound: BoundKind,
1883        value_order: StreamOrder,
1884        value_retry: StreamRetry,
1885        key_type: DebugType,
1886        value_type: DebugType,
1887    },
1888    KeyedSingleton {
1889        bound: KeyedSingletonBoundKind,
1890        key_type: DebugType,
1891        value_type: DebugType,
1892    },
1893}
1894
1895impl CollectionKind {
1896    pub fn is_bounded(&self) -> bool {
1897        matches!(
1898            self,
1899            CollectionKind::Stream {
1900                bound: BoundKind::Bounded,
1901                ..
1902            } | CollectionKind::Singleton {
1903                bound: SingletonBoundKind::Bounded,
1904                ..
1905            } | CollectionKind::Optional {
1906                bound: BoundKind::Bounded,
1907                ..
1908            } | CollectionKind::KeyedStream {
1909                bound: BoundKind::Bounded,
1910                ..
1911            } | CollectionKind::KeyedSingleton {
1912                bound: KeyedSingletonBoundKind::Bounded,
1913                ..
1914            }
1915        )
1916    }
1917}
1918
1919#[derive(Clone, serde::Serialize)]
1920pub struct HydroIrMetadata {
1921    pub location_id: LocationId,
1922    pub collection_kind: CollectionKind,
1923    pub cardinality: Option<usize>,
1924    pub tag: Option<String>,
1925    pub op: HydroIrOpMetadata,
1926}
1927
1928// HydroIrMetadata shouldn't be used to hash or compare
1929impl Hash for HydroIrMetadata {
1930    fn hash<H: Hasher>(&self, _: &mut H) {}
1931}
1932
1933impl PartialEq for HydroIrMetadata {
1934    fn eq(&self, _: &Self) -> bool {
1935        true
1936    }
1937}
1938
1939impl Eq for HydroIrMetadata {}
1940
1941impl Debug for HydroIrMetadata {
1942    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1943        f.debug_struct("HydroIrMetadata")
1944            .field("location_id", &self.location_id)
1945            .field("collection_kind", &self.collection_kind)
1946            .finish()
1947    }
1948}
1949
1950/// Metadata that is specific to the operator itself, rather than its outputs.
1951/// This is available on _both_ inner nodes and roots.
1952#[derive(Clone, serde::Serialize)]
1953pub struct HydroIrOpMetadata {
1954    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
1955    pub backtrace: Backtrace,
1956    pub cpu_usage: Option<f64>,
1957    pub network_recv_cpu_usage: Option<f64>,
1958    pub id: Option<usize>,
1959}
1960
1961impl HydroIrOpMetadata {
1962    #[expect(
1963        clippy::new_without_default,
1964        reason = "explicit calls to new ensure correct backtrace bounds"
1965    )]
1966    pub fn new() -> HydroIrOpMetadata {
1967        Self::new_with_skip(1)
1968    }
1969
1970    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1971        HydroIrOpMetadata {
1972            backtrace: Backtrace::get_backtrace(2 + skip_count),
1973            cpu_usage: None,
1974            network_recv_cpu_usage: None,
1975            id: None,
1976        }
1977    }
1978}
1979
1980impl Debug for HydroIrOpMetadata {
1981    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1982        f.debug_struct("HydroIrOpMetadata").finish()
1983    }
1984}
1985
1986impl Hash for HydroIrOpMetadata {
1987    fn hash<H: Hasher>(&self, _: &mut H) {}
1988}
1989
1990/// An intermediate node in a Hydro graph, which consumes data
1991/// from upstream nodes and emits data to downstream nodes.
1992#[derive(Debug, Hash, serde::Serialize)]
1993pub enum HydroNode {
1994    Placeholder,
1995
1996    /// Manually "casts" between two different collection kinds.
1997    ///
1998    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1999    /// correctness checks. In particular, the user must ensure that every possible
2000    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
2001    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
2002    /// collection. This ensures that the simulator does not miss any possible outputs.
2003    Cast {
2004        inner: Box<HydroNode>,
2005        metadata: HydroIrMetadata,
2006    },
2007
2008    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
2009    /// interpretation of the input stream.
2010    ///
2011    /// In production, this simply passes through the input, but in simulation, this operator
2012    /// explicitly selects a randomized interpretation.
2013    ObserveNonDet {
2014        inner: Box<HydroNode>,
2015        trusted: bool, // if true, we do not need to simulate non-determinism
2016        metadata: HydroIrMetadata,
2017    },
2018
2019    Source {
2020        source: HydroSource,
2021        metadata: HydroIrMetadata,
2022    },
2023
2024    SingletonSource {
2025        value: DebugExpr,
2026        first_tick_only: bool,
2027        metadata: HydroIrMetadata,
2028    },
2029
2030    CycleSource {
2031        cycle_id: CycleId,
2032        metadata: HydroIrMetadata,
2033    },
2034
2035    Tee {
2036        inner: SharedNode,
2037        metadata: HydroIrMetadata,
2038    },
2039
2040    Partition {
2041        inner: SharedNode,
2042        f: DebugExpr,
2043        is_true: bool,
2044        metadata: HydroIrMetadata,
2045    },
2046
2047    BeginAtomic {
2048        inner: Box<HydroNode>,
2049        metadata: HydroIrMetadata,
2050    },
2051
2052    EndAtomic {
2053        inner: Box<HydroNode>,
2054        metadata: HydroIrMetadata,
2055    },
2056
2057    Batch {
2058        inner: Box<HydroNode>,
2059        metadata: HydroIrMetadata,
2060    },
2061
2062    YieldConcat {
2063        inner: Box<HydroNode>,
2064        metadata: HydroIrMetadata,
2065    },
2066
2067    Chain {
2068        first: Box<HydroNode>,
2069        second: Box<HydroNode>,
2070        metadata: HydroIrMetadata,
2071    },
2072
2073    MergeOrdered {
2074        first: Box<HydroNode>,
2075        second: Box<HydroNode>,
2076        metadata: HydroIrMetadata,
2077    },
2078
2079    ChainFirst {
2080        first: Box<HydroNode>,
2081        second: Box<HydroNode>,
2082        metadata: HydroIrMetadata,
2083    },
2084
2085    CrossProduct {
2086        left: Box<HydroNode>,
2087        right: Box<HydroNode>,
2088        metadata: HydroIrMetadata,
2089    },
2090
2091    CrossSingleton {
2092        left: Box<HydroNode>,
2093        right: Box<HydroNode>,
2094        metadata: HydroIrMetadata,
2095    },
2096
2097    Join {
2098        left: Box<HydroNode>,
2099        right: Box<HydroNode>,
2100        metadata: HydroIrMetadata,
2101    },
2102
2103    /// Asymmetric join where the right (build) side is bounded.
2104    /// The build side is accumulated (stratum-delayed) into a hash table,
2105    /// then the left (probe) side streams through preserving its ordering.
2106    JoinHalf {
2107        left: Box<HydroNode>,
2108        right: Box<HydroNode>,
2109        metadata: HydroIrMetadata,
2110    },
2111
2112    Difference {
2113        pos: Box<HydroNode>,
2114        neg: Box<HydroNode>,
2115        metadata: HydroIrMetadata,
2116    },
2117
2118    AntiJoin {
2119        pos: Box<HydroNode>,
2120        neg: Box<HydroNode>,
2121        metadata: HydroIrMetadata,
2122    },
2123
2124    ResolveFutures {
2125        input: Box<HydroNode>,
2126        metadata: HydroIrMetadata,
2127    },
2128    ResolveFuturesBlocking {
2129        input: Box<HydroNode>,
2130        metadata: HydroIrMetadata,
2131    },
2132    ResolveFuturesOrdered {
2133        input: Box<HydroNode>,
2134        metadata: HydroIrMetadata,
2135    },
2136
2137    Map {
2138        f: DebugExpr,
2139        input: Box<HydroNode>,
2140        metadata: HydroIrMetadata,
2141    },
2142    FlatMap {
2143        f: DebugExpr,
2144        input: Box<HydroNode>,
2145        metadata: HydroIrMetadata,
2146    },
2147    FlatMapStreamBlocking {
2148        f: DebugExpr,
2149        input: Box<HydroNode>,
2150        metadata: HydroIrMetadata,
2151    },
2152    Filter {
2153        f: DebugExpr,
2154        input: Box<HydroNode>,
2155        metadata: HydroIrMetadata,
2156    },
2157    FilterMap {
2158        f: DebugExpr,
2159        input: Box<HydroNode>,
2160        metadata: HydroIrMetadata,
2161    },
2162
2163    DeferTick {
2164        input: Box<HydroNode>,
2165        metadata: HydroIrMetadata,
2166    },
2167    Enumerate {
2168        input: Box<HydroNode>,
2169        metadata: HydroIrMetadata,
2170    },
2171    Inspect {
2172        f: DebugExpr,
2173        input: Box<HydroNode>,
2174        metadata: HydroIrMetadata,
2175    },
2176
2177    Unique {
2178        input: Box<HydroNode>,
2179        metadata: HydroIrMetadata,
2180    },
2181
2182    Sort {
2183        input: Box<HydroNode>,
2184        metadata: HydroIrMetadata,
2185    },
2186    Fold {
2187        init: DebugExpr,
2188        acc: DebugExpr,
2189        input: Box<HydroNode>,
2190        metadata: HydroIrMetadata,
2191    },
2192
2193    Scan {
2194        init: DebugExpr,
2195        acc: DebugExpr,
2196        input: Box<HydroNode>,
2197        metadata: HydroIrMetadata,
2198    },
2199    ScanAsyncBlocking {
2200        init: DebugExpr,
2201        acc: DebugExpr,
2202        input: Box<HydroNode>,
2203        metadata: HydroIrMetadata,
2204    },
2205    FoldKeyed {
2206        init: DebugExpr,
2207        acc: DebugExpr,
2208        input: Box<HydroNode>,
2209        metadata: HydroIrMetadata,
2210    },
2211
2212    Reduce {
2213        f: DebugExpr,
2214        input: Box<HydroNode>,
2215        metadata: HydroIrMetadata,
2216    },
2217    ReduceKeyed {
2218        f: DebugExpr,
2219        input: Box<HydroNode>,
2220        metadata: HydroIrMetadata,
2221    },
2222    ReduceKeyedWatermark {
2223        f: DebugExpr,
2224        input: Box<HydroNode>,
2225        watermark: Box<HydroNode>,
2226        metadata: HydroIrMetadata,
2227    },
2228
2229    Network {
2230        name: Option<String>,
2231        networking_info: crate::networking::NetworkingInfo,
2232        serialize_fn: Option<DebugExpr>,
2233        instantiate_fn: DebugInstantiate,
2234        deserialize_fn: Option<DebugExpr>,
2235        input: Box<HydroNode>,
2236        metadata: HydroIrMetadata,
2237    },
2238
2239    ExternalInput {
2240        from_external_key: LocationKey,
2241        from_port_id: ExternalPortId,
2242        from_many: bool,
2243        codec_type: DebugType,
2244        #[serde(skip)]
2245        port_hint: NetworkHint,
2246        instantiate_fn: DebugInstantiate,
2247        deserialize_fn: Option<DebugExpr>,
2248        metadata: HydroIrMetadata,
2249    },
2250
2251    Counter {
2252        tag: String,
2253        duration: DebugExpr,
2254        prefix: String,
2255        input: Box<HydroNode>,
2256        metadata: HydroIrMetadata,
2257    },
2258}
2259
2260pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2261pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2262
2263impl HydroNode {
2264    pub fn transform_bottom_up(
2265        &mut self,
2266        transform: &mut impl FnMut(&mut HydroNode),
2267        seen_tees: &mut SeenSharedNodes,
2268        check_well_formed: bool,
2269    ) {
2270        self.transform_children(
2271            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2272            seen_tees,
2273        );
2274
2275        transform(self);
2276
2277        let self_location = self.metadata().location_id.root();
2278
2279        if check_well_formed {
2280            match &*self {
2281                HydroNode::Network { .. } => {}
2282                _ => {
2283                    self.input_metadata().iter().for_each(|i| {
2284                        if i.location_id.root() != self_location {
2285                            panic!(
2286                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2287                                i,
2288                                i.location_id.root(),
2289                                self,
2290                                self_location
2291                            )
2292                        }
2293                    });
2294                }
2295            }
2296        }
2297    }
2298
2299    #[inline(always)]
2300    pub fn transform_children(
2301        &mut self,
2302        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2303        seen_tees: &mut SeenSharedNodes,
2304    ) {
2305        match self {
2306            HydroNode::Placeholder => {
2307                panic!();
2308            }
2309
2310            HydroNode::Source { .. }
2311            | HydroNode::SingletonSource { .. }
2312            | HydroNode::CycleSource { .. }
2313            | HydroNode::ExternalInput { .. } => {}
2314
2315            HydroNode::Tee { inner, .. } => {
2316                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2317                    *inner = SharedNode(transformed.clone());
2318                } else {
2319                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2320                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2321                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2322                    transform(&mut orig, seen_tees);
2323                    *transformed_cell.borrow_mut() = orig;
2324                    *inner = SharedNode(transformed_cell);
2325                }
2326            }
2327
2328            HydroNode::Partition { inner, .. } => {
2329                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2330                    *inner = SharedNode(transformed.clone());
2331                } else {
2332                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2333                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2334                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2335                    transform(&mut orig, seen_tees);
2336                    *transformed_cell.borrow_mut() = orig;
2337                    *inner = SharedNode(transformed_cell);
2338                }
2339            }
2340
2341            HydroNode::Cast { inner, .. }
2342            | HydroNode::ObserveNonDet { inner, .. }
2343            | HydroNode::BeginAtomic { inner, .. }
2344            | HydroNode::EndAtomic { inner, .. }
2345            | HydroNode::Batch { inner, .. }
2346            | HydroNode::YieldConcat { inner, .. } => {
2347                transform(inner.as_mut(), seen_tees);
2348            }
2349
2350            HydroNode::Chain { first, second, .. } => {
2351                transform(first.as_mut(), seen_tees);
2352                transform(second.as_mut(), seen_tees);
2353            }
2354
2355            HydroNode::MergeOrdered { first, second, .. } => {
2356                transform(first.as_mut(), seen_tees);
2357                transform(second.as_mut(), seen_tees);
2358            }
2359
2360            HydroNode::ChainFirst { first, second, .. } => {
2361                transform(first.as_mut(), seen_tees);
2362                transform(second.as_mut(), seen_tees);
2363            }
2364
2365            HydroNode::CrossSingleton { left, right, .. }
2366            | HydroNode::CrossProduct { left, right, .. }
2367            | HydroNode::Join { left, right, .. }
2368            | HydroNode::JoinHalf { left, right, .. } => {
2369                transform(left.as_mut(), seen_tees);
2370                transform(right.as_mut(), seen_tees);
2371            }
2372
2373            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2374                transform(pos.as_mut(), seen_tees);
2375                transform(neg.as_mut(), seen_tees);
2376            }
2377
2378            HydroNode::ReduceKeyedWatermark {
2379                input, watermark, ..
2380            } => {
2381                transform(input.as_mut(), seen_tees);
2382                transform(watermark.as_mut(), seen_tees);
2383            }
2384
2385            HydroNode::Map { input, .. }
2386            | HydroNode::ResolveFutures { input, .. }
2387            | HydroNode::ResolveFuturesBlocking { input, .. }
2388            | HydroNode::ResolveFuturesOrdered { input, .. }
2389            | HydroNode::FlatMap { input, .. }
2390            | HydroNode::FlatMapStreamBlocking { input, .. }
2391            | HydroNode::Filter { input, .. }
2392            | HydroNode::FilterMap { input, .. }
2393            | HydroNode::Sort { input, .. }
2394            | HydroNode::DeferTick { input, .. }
2395            | HydroNode::Enumerate { input, .. }
2396            | HydroNode::Inspect { input, .. }
2397            | HydroNode::Unique { input, .. }
2398            | HydroNode::Network { input, .. }
2399            | HydroNode::Fold { input, .. }
2400            | HydroNode::Scan { input, .. }
2401            | HydroNode::ScanAsyncBlocking { input, .. }
2402            | HydroNode::FoldKeyed { input, .. }
2403            | HydroNode::Reduce { input, .. }
2404            | HydroNode::ReduceKeyed { input, .. }
2405            | HydroNode::Counter { input, .. } => {
2406                transform(input.as_mut(), seen_tees);
2407            }
2408        }
2409    }
2410
2411    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2412        match self {
2413            HydroNode::Placeholder => HydroNode::Placeholder,
2414            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2415                inner: Box::new(inner.deep_clone(seen_tees)),
2416                metadata: metadata.clone(),
2417            },
2418            HydroNode::ObserveNonDet {
2419                inner,
2420                trusted,
2421                metadata,
2422            } => HydroNode::ObserveNonDet {
2423                inner: Box::new(inner.deep_clone(seen_tees)),
2424                trusted: *trusted,
2425                metadata: metadata.clone(),
2426            },
2427            HydroNode::Source { source, metadata } => HydroNode::Source {
2428                source: source.clone(),
2429                metadata: metadata.clone(),
2430            },
2431            HydroNode::SingletonSource {
2432                value,
2433                first_tick_only,
2434                metadata,
2435            } => HydroNode::SingletonSource {
2436                value: value.clone(),
2437                first_tick_only: *first_tick_only,
2438                metadata: metadata.clone(),
2439            },
2440            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2441                cycle_id: *cycle_id,
2442                metadata: metadata.clone(),
2443            },
2444            HydroNode::Tee { inner, metadata } => {
2445                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2446                    HydroNode::Tee {
2447                        inner: SharedNode(transformed.clone()),
2448                        metadata: metadata.clone(),
2449                    }
2450                } else {
2451                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2452                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2453                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2454                    *new_rc.borrow_mut() = cloned;
2455                    HydroNode::Tee {
2456                        inner: SharedNode(new_rc),
2457                        metadata: metadata.clone(),
2458                    }
2459                }
2460            }
2461            HydroNode::Partition {
2462                inner,
2463                f,
2464                is_true,
2465                metadata,
2466            } => {
2467                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2468                    HydroNode::Partition {
2469                        inner: SharedNode(transformed.clone()),
2470                        f: f.clone(),
2471                        is_true: *is_true,
2472                        metadata: metadata.clone(),
2473                    }
2474                } else {
2475                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2476                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2477                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2478                    *new_rc.borrow_mut() = cloned;
2479                    HydroNode::Partition {
2480                        inner: SharedNode(new_rc),
2481                        f: f.clone(),
2482                        is_true: *is_true,
2483                        metadata: metadata.clone(),
2484                    }
2485                }
2486            }
2487            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2488                inner: Box::new(inner.deep_clone(seen_tees)),
2489                metadata: metadata.clone(),
2490            },
2491            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2492                inner: Box::new(inner.deep_clone(seen_tees)),
2493                metadata: metadata.clone(),
2494            },
2495            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2496                inner: Box::new(inner.deep_clone(seen_tees)),
2497                metadata: metadata.clone(),
2498            },
2499            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2500                inner: Box::new(inner.deep_clone(seen_tees)),
2501                metadata: metadata.clone(),
2502            },
2503            HydroNode::Chain {
2504                first,
2505                second,
2506                metadata,
2507            } => HydroNode::Chain {
2508                first: Box::new(first.deep_clone(seen_tees)),
2509                second: Box::new(second.deep_clone(seen_tees)),
2510                metadata: metadata.clone(),
2511            },
2512            HydroNode::MergeOrdered {
2513                first,
2514                second,
2515                metadata,
2516            } => HydroNode::MergeOrdered {
2517                first: Box::new(first.deep_clone(seen_tees)),
2518                second: Box::new(second.deep_clone(seen_tees)),
2519                metadata: metadata.clone(),
2520            },
2521            HydroNode::ChainFirst {
2522                first,
2523                second,
2524                metadata,
2525            } => HydroNode::ChainFirst {
2526                first: Box::new(first.deep_clone(seen_tees)),
2527                second: Box::new(second.deep_clone(seen_tees)),
2528                metadata: metadata.clone(),
2529            },
2530            HydroNode::CrossProduct {
2531                left,
2532                right,
2533                metadata,
2534            } => HydroNode::CrossProduct {
2535                left: Box::new(left.deep_clone(seen_tees)),
2536                right: Box::new(right.deep_clone(seen_tees)),
2537                metadata: metadata.clone(),
2538            },
2539            HydroNode::CrossSingleton {
2540                left,
2541                right,
2542                metadata,
2543            } => HydroNode::CrossSingleton {
2544                left: Box::new(left.deep_clone(seen_tees)),
2545                right: Box::new(right.deep_clone(seen_tees)),
2546                metadata: metadata.clone(),
2547            },
2548            HydroNode::Join {
2549                left,
2550                right,
2551                metadata,
2552            } => HydroNode::Join {
2553                left: Box::new(left.deep_clone(seen_tees)),
2554                right: Box::new(right.deep_clone(seen_tees)),
2555                metadata: metadata.clone(),
2556            },
2557            HydroNode::JoinHalf {
2558                left,
2559                right,
2560                metadata,
2561            } => HydroNode::JoinHalf {
2562                left: Box::new(left.deep_clone(seen_tees)),
2563                right: Box::new(right.deep_clone(seen_tees)),
2564                metadata: metadata.clone(),
2565            },
2566            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2567                pos: Box::new(pos.deep_clone(seen_tees)),
2568                neg: Box::new(neg.deep_clone(seen_tees)),
2569                metadata: metadata.clone(),
2570            },
2571            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2572                pos: Box::new(pos.deep_clone(seen_tees)),
2573                neg: Box::new(neg.deep_clone(seen_tees)),
2574                metadata: metadata.clone(),
2575            },
2576            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2577                input: Box::new(input.deep_clone(seen_tees)),
2578                metadata: metadata.clone(),
2579            },
2580            HydroNode::ResolveFuturesBlocking { input, metadata } => {
2581                HydroNode::ResolveFuturesBlocking {
2582                    input: Box::new(input.deep_clone(seen_tees)),
2583                    metadata: metadata.clone(),
2584                }
2585            }
2586            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2587                HydroNode::ResolveFuturesOrdered {
2588                    input: Box::new(input.deep_clone(seen_tees)),
2589                    metadata: metadata.clone(),
2590                }
2591            }
2592            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2593                f: f.clone(),
2594                input: Box::new(input.deep_clone(seen_tees)),
2595                metadata: metadata.clone(),
2596            },
2597            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2598                f: f.clone(),
2599                input: Box::new(input.deep_clone(seen_tees)),
2600                metadata: metadata.clone(),
2601            },
2602            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2603                HydroNode::FlatMapStreamBlocking {
2604                    f: f.clone(),
2605                    input: Box::new(input.deep_clone(seen_tees)),
2606                    metadata: metadata.clone(),
2607                }
2608            }
2609            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2610                f: f.clone(),
2611                input: Box::new(input.deep_clone(seen_tees)),
2612                metadata: metadata.clone(),
2613            },
2614            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2615                f: f.clone(),
2616                input: Box::new(input.deep_clone(seen_tees)),
2617                metadata: metadata.clone(),
2618            },
2619            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2620                input: Box::new(input.deep_clone(seen_tees)),
2621                metadata: metadata.clone(),
2622            },
2623            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2624                input: Box::new(input.deep_clone(seen_tees)),
2625                metadata: metadata.clone(),
2626            },
2627            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2628                f: f.clone(),
2629                input: Box::new(input.deep_clone(seen_tees)),
2630                metadata: metadata.clone(),
2631            },
2632            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2633                input: Box::new(input.deep_clone(seen_tees)),
2634                metadata: metadata.clone(),
2635            },
2636            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2637                input: Box::new(input.deep_clone(seen_tees)),
2638                metadata: metadata.clone(),
2639            },
2640            HydroNode::Fold {
2641                init,
2642                acc,
2643                input,
2644                metadata,
2645            } => HydroNode::Fold {
2646                init: init.clone(),
2647                acc: acc.clone(),
2648                input: Box::new(input.deep_clone(seen_tees)),
2649                metadata: metadata.clone(),
2650            },
2651            HydroNode::Scan {
2652                init,
2653                acc,
2654                input,
2655                metadata,
2656            } => HydroNode::Scan {
2657                init: init.clone(),
2658                acc: acc.clone(),
2659                input: Box::new(input.deep_clone(seen_tees)),
2660                metadata: metadata.clone(),
2661            },
2662            HydroNode::ScanAsyncBlocking {
2663                init,
2664                acc,
2665                input,
2666                metadata,
2667            } => HydroNode::ScanAsyncBlocking {
2668                init: init.clone(),
2669                acc: acc.clone(),
2670                input: Box::new(input.deep_clone(seen_tees)),
2671                metadata: metadata.clone(),
2672            },
2673            HydroNode::FoldKeyed {
2674                init,
2675                acc,
2676                input,
2677                metadata,
2678            } => HydroNode::FoldKeyed {
2679                init: init.clone(),
2680                acc: acc.clone(),
2681                input: Box::new(input.deep_clone(seen_tees)),
2682                metadata: metadata.clone(),
2683            },
2684            HydroNode::ReduceKeyedWatermark {
2685                f,
2686                input,
2687                watermark,
2688                metadata,
2689            } => HydroNode::ReduceKeyedWatermark {
2690                f: f.clone(),
2691                input: Box::new(input.deep_clone(seen_tees)),
2692                watermark: Box::new(watermark.deep_clone(seen_tees)),
2693                metadata: metadata.clone(),
2694            },
2695            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2696                f: f.clone(),
2697                input: Box::new(input.deep_clone(seen_tees)),
2698                metadata: metadata.clone(),
2699            },
2700            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2701                f: f.clone(),
2702                input: Box::new(input.deep_clone(seen_tees)),
2703                metadata: metadata.clone(),
2704            },
2705            HydroNode::Network {
2706                name,
2707                networking_info,
2708                serialize_fn,
2709                instantiate_fn,
2710                deserialize_fn,
2711                input,
2712                metadata,
2713            } => HydroNode::Network {
2714                name: name.clone(),
2715                networking_info: networking_info.clone(),
2716                serialize_fn: serialize_fn.clone(),
2717                instantiate_fn: instantiate_fn.clone(),
2718                deserialize_fn: deserialize_fn.clone(),
2719                input: Box::new(input.deep_clone(seen_tees)),
2720                metadata: metadata.clone(),
2721            },
2722            HydroNode::ExternalInput {
2723                from_external_key,
2724                from_port_id,
2725                from_many,
2726                codec_type,
2727                port_hint,
2728                instantiate_fn,
2729                deserialize_fn,
2730                metadata,
2731            } => HydroNode::ExternalInput {
2732                from_external_key: *from_external_key,
2733                from_port_id: *from_port_id,
2734                from_many: *from_many,
2735                codec_type: codec_type.clone(),
2736                port_hint: *port_hint,
2737                instantiate_fn: instantiate_fn.clone(),
2738                deserialize_fn: deserialize_fn.clone(),
2739                metadata: metadata.clone(),
2740            },
2741            HydroNode::Counter {
2742                tag,
2743                duration,
2744                prefix,
2745                input,
2746                metadata,
2747            } => HydroNode::Counter {
2748                tag: tag.clone(),
2749                duration: duration.clone(),
2750                prefix: prefix.clone(),
2751                input: Box::new(input.deep_clone(seen_tees)),
2752                metadata: metadata.clone(),
2753            },
2754        }
2755    }
2756
2757    #[cfg(feature = "build")]
2758    pub fn emit_core(
2759        &mut self,
2760        builders_or_callback: &mut BuildersOrCallback<
2761            impl FnMut(&mut HydroRoot, &mut usize),
2762            impl FnMut(&mut HydroNode, &mut usize),
2763        >,
2764        seen_tees: &mut SeenSharedNodes,
2765        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2766        next_stmt_id: &mut usize,
2767    ) -> syn::Ident {
2768        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2769
2770        self.transform_bottom_up(
2771            &mut |node: &mut HydroNode| {
2772                let out_location = node.metadata().location_id.clone();
2773                match node {
2774                    HydroNode::Placeholder => {
2775                        panic!()
2776                    }
2777
2778                    HydroNode::Cast { .. } => {
2779                        // Cast passes through the input ident unchanged
2780                        // The input ident is already on the stack from processing the child
2781                        match builders_or_callback {
2782                            BuildersOrCallback::Builders(_) => {}
2783                            BuildersOrCallback::Callback(_, node_callback) => {
2784                                node_callback(node, next_stmt_id);
2785                            }
2786                        }
2787
2788                        *next_stmt_id += 1;
2789                        // input_ident stays on stack as output
2790                    }
2791
2792                    HydroNode::ObserveNonDet {
2793                        inner,
2794                        trusted,
2795                        metadata,
2796                        ..
2797                    } => {
2798                        let inner_ident = ident_stack.pop().unwrap();
2799
2800                        let observe_ident =
2801                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2802
2803                        match builders_or_callback {
2804                            BuildersOrCallback::Builders(graph_builders) => {
2805                                graph_builders.observe_nondet(
2806                                    *trusted,
2807                                    &inner.metadata().location_id,
2808                                    inner_ident,
2809                                    &inner.metadata().collection_kind,
2810                                    &observe_ident,
2811                                    &metadata.collection_kind,
2812                                    &metadata.op,
2813                                );
2814                            }
2815                            BuildersOrCallback::Callback(_, node_callback) => {
2816                                node_callback(node, next_stmt_id);
2817                            }
2818                        }
2819
2820                        *next_stmt_id += 1;
2821
2822                        ident_stack.push(observe_ident);
2823                    }
2824
2825                    HydroNode::Batch {
2826                        inner, metadata, ..
2827                    } => {
2828                        let inner_ident = ident_stack.pop().unwrap();
2829
2830                        let batch_ident =
2831                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2832
2833                        match builders_or_callback {
2834                            BuildersOrCallback::Builders(graph_builders) => {
2835                                graph_builders.batch(
2836                                    inner_ident,
2837                                    &inner.metadata().location_id,
2838                                    &inner.metadata().collection_kind,
2839                                    &batch_ident,
2840                                    &out_location,
2841                                    &metadata.op,
2842                                );
2843                            }
2844                            BuildersOrCallback::Callback(_, node_callback) => {
2845                                node_callback(node, next_stmt_id);
2846                            }
2847                        }
2848
2849                        *next_stmt_id += 1;
2850
2851                        ident_stack.push(batch_ident);
2852                    }
2853
2854                    HydroNode::YieldConcat { inner, .. } => {
2855                        let inner_ident = ident_stack.pop().unwrap();
2856
2857                        let yield_ident =
2858                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2859
2860                        match builders_or_callback {
2861                            BuildersOrCallback::Builders(graph_builders) => {
2862                                graph_builders.yield_from_tick(
2863                                    inner_ident,
2864                                    &inner.metadata().location_id,
2865                                    &inner.metadata().collection_kind,
2866                                    &yield_ident,
2867                                    &out_location,
2868                                );
2869                            }
2870                            BuildersOrCallback::Callback(_, node_callback) => {
2871                                node_callback(node, next_stmt_id);
2872                            }
2873                        }
2874
2875                        *next_stmt_id += 1;
2876
2877                        ident_stack.push(yield_ident);
2878                    }
2879
2880                    HydroNode::BeginAtomic { inner, metadata } => {
2881                        let inner_ident = ident_stack.pop().unwrap();
2882
2883                        let begin_ident =
2884                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2885
2886                        match builders_or_callback {
2887                            BuildersOrCallback::Builders(graph_builders) => {
2888                                graph_builders.begin_atomic(
2889                                    inner_ident,
2890                                    &inner.metadata().location_id,
2891                                    &inner.metadata().collection_kind,
2892                                    &begin_ident,
2893                                    &out_location,
2894                                    &metadata.op,
2895                                );
2896                            }
2897                            BuildersOrCallback::Callback(_, node_callback) => {
2898                                node_callback(node, next_stmt_id);
2899                            }
2900                        }
2901
2902                        *next_stmt_id += 1;
2903
2904                        ident_stack.push(begin_ident);
2905                    }
2906
2907                    HydroNode::EndAtomic { inner, .. } => {
2908                        let inner_ident = ident_stack.pop().unwrap();
2909
2910                        let end_ident =
2911                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2912
2913                        match builders_or_callback {
2914                            BuildersOrCallback::Builders(graph_builders) => {
2915                                graph_builders.end_atomic(
2916                                    inner_ident,
2917                                    &inner.metadata().location_id,
2918                                    &inner.metadata().collection_kind,
2919                                    &end_ident,
2920                                );
2921                            }
2922                            BuildersOrCallback::Callback(_, node_callback) => {
2923                                node_callback(node, next_stmt_id);
2924                            }
2925                        }
2926
2927                        *next_stmt_id += 1;
2928
2929                        ident_stack.push(end_ident);
2930                    }
2931
2932                    HydroNode::Source {
2933                        source, metadata, ..
2934                    } => {
2935                        if let HydroSource::ExternalNetwork() = source {
2936                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2937                        } else {
2938                            let source_ident =
2939                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2940
2941                            let source_stmt = match source {
2942                                HydroSource::Stream(expr) => {
2943                                    debug_assert!(metadata.location_id.is_top_level());
2944                                    parse_quote! {
2945                                        #source_ident = source_stream(#expr);
2946                                    }
2947                                }
2948
2949                                HydroSource::ExternalNetwork() => {
2950                                    unreachable!()
2951                                }
2952
2953                                HydroSource::Iter(expr) => {
2954                                    if metadata.location_id.is_top_level() {
2955                                        parse_quote! {
2956                                            #source_ident = source_iter(#expr);
2957                                        }
2958                                    } else {
2959                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2960                                        parse_quote! {
2961                                            #source_ident = source_iter(#expr) -> persist::<'static>();
2962                                        }
2963                                    }
2964                                }
2965
2966                                HydroSource::Spin() => {
2967                                    debug_assert!(metadata.location_id.is_top_level());
2968                                    parse_quote! {
2969                                        #source_ident = spin();
2970                                    }
2971                                }
2972
2973                                HydroSource::ClusterMembers(target_loc, state) => {
2974                                    debug_assert!(metadata.location_id.is_top_level());
2975
2976                                    let members_tee_ident = syn::Ident::new(
2977                                        &format!(
2978                                            "__cluster_members_tee_{}_{}",
2979                                            metadata.location_id.root().key(),
2980                                            target_loc.key(),
2981                                        ),
2982                                        Span::call_site(),
2983                                    );
2984
2985                                    match state {
2986                                        ClusterMembersState::Stream(d) => {
2987                                            parse_quote! {
2988                                                #members_tee_ident = source_stream(#d) -> tee();
2989                                                #source_ident = #members_tee_ident;
2990                                            }
2991                                        },
2992                                        ClusterMembersState::Uninit => syn::parse_quote! {
2993                                            #source_ident = source_stream(DUMMY);
2994                                        },
2995                                        ClusterMembersState::Tee(..) => parse_quote! {
2996                                            #source_ident = #members_tee_ident;
2997                                        },
2998                                    }
2999                                }
3000
3001                                HydroSource::Embedded(ident) => {
3002                                    parse_quote! {
3003                                        #source_ident = source_stream(#ident);
3004                                    }
3005                                }
3006
3007                                HydroSource::EmbeddedSingleton(ident) => {
3008                                    parse_quote! {
3009                                        #source_ident = source_iter([#ident]);
3010                                    }
3011                                }
3012                            };
3013
3014                            match builders_or_callback {
3015                                BuildersOrCallback::Builders(graph_builders) => {
3016                                    let builder = graph_builders.get_dfir_mut(&out_location);
3017                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3018                                }
3019                                BuildersOrCallback::Callback(_, node_callback) => {
3020                                    node_callback(node, next_stmt_id);
3021                                }
3022                            }
3023
3024                            *next_stmt_id += 1;
3025
3026                            ident_stack.push(source_ident);
3027                        }
3028                    }
3029
3030                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3031                        let source_ident =
3032                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3033
3034                        match builders_or_callback {
3035                            BuildersOrCallback::Builders(graph_builders) => {
3036                                let builder = graph_builders.get_dfir_mut(&out_location);
3037
3038                                if *first_tick_only {
3039                                    assert!(
3040                                        !metadata.location_id.is_top_level(),
3041                                        "first_tick_only SingletonSource must be inside a tick"
3042                                    );
3043                                }
3044
3045                                if *first_tick_only
3046                                    || (metadata.location_id.is_top_level()
3047                                        && metadata.collection_kind.is_bounded())
3048                                {
3049                                    builder.add_dfir(
3050                                        parse_quote! {
3051                                            #source_ident = source_iter([#value]);
3052                                        },
3053                                        None,
3054                                        Some(&next_stmt_id.to_string()),
3055                                    );
3056                                } else {
3057                                    builder.add_dfir(
3058                                        parse_quote! {
3059                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3060                                        },
3061                                        None,
3062                                        Some(&next_stmt_id.to_string()),
3063                                    );
3064                                }
3065                            }
3066                            BuildersOrCallback::Callback(_, node_callback) => {
3067                                node_callback(node, next_stmt_id);
3068                            }
3069                        }
3070
3071                        *next_stmt_id += 1;
3072
3073                        ident_stack.push(source_ident);
3074                    }
3075
3076                    HydroNode::CycleSource { cycle_id, .. } => {
3077                        let ident = cycle_id.as_ident();
3078
3079                        match builders_or_callback {
3080                            BuildersOrCallback::Builders(_) => {}
3081                            BuildersOrCallback::Callback(_, node_callback) => {
3082                                node_callback(node, next_stmt_id);
3083                            }
3084                        }
3085
3086                        // consume a stmt id even though we did not emit anything so that we can instrument this
3087                        *next_stmt_id += 1;
3088
3089                        ident_stack.push(ident);
3090                    }
3091
3092                    HydroNode::Tee { inner, .. } => {
3093                        let ret_ident = if let Some(built_idents) =
3094                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3095                        {
3096                            match builders_or_callback {
3097                                BuildersOrCallback::Builders(_) => {}
3098                                BuildersOrCallback::Callback(_, node_callback) => {
3099                                    node_callback(node, next_stmt_id);
3100                                }
3101                            }
3102
3103                            built_idents[0].clone()
3104                        } else {
3105                            // The inner node was already processed by transform_bottom_up,
3106                            // so its ident is on the stack
3107                            let inner_ident = ident_stack.pop().unwrap();
3108
3109                            let tee_ident =
3110                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3111
3112                            built_tees.insert(
3113                                inner.0.as_ref() as *const RefCell<HydroNode>,
3114                                vec![tee_ident.clone()],
3115                            );
3116
3117                            match builders_or_callback {
3118                                BuildersOrCallback::Builders(graph_builders) => {
3119                                    let builder = graph_builders.get_dfir_mut(&out_location);
3120                                    builder.add_dfir(
3121                                        parse_quote! {
3122                                            #tee_ident = #inner_ident -> tee();
3123                                        },
3124                                        None,
3125                                        Some(&next_stmt_id.to_string()),
3126                                    );
3127                                }
3128                                BuildersOrCallback::Callback(_, node_callback) => {
3129                                    node_callback(node, next_stmt_id);
3130                                }
3131                            }
3132
3133                            tee_ident
3134                        };
3135
3136                        // we consume a stmt id regardless of if we emit the tee() operator,
3137                        // so that during rewrites we touch all recipients of the tee()
3138
3139                        *next_stmt_id += 1;
3140                        ident_stack.push(ret_ident);
3141                    }
3142
3143                    HydroNode::Partition {
3144                        inner, f, is_true, ..
3145                    } => {
3146                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3147                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3148                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3149                            match builders_or_callback {
3150                                BuildersOrCallback::Builders(_) => {}
3151                                BuildersOrCallback::Callback(_, node_callback) => {
3152                                    node_callback(node, next_stmt_id);
3153                                }
3154                            }
3155
3156                            let idx = if is_true { 0 } else { 1 };
3157                            built_idents[idx].clone()
3158                        } else {
3159                            // The inner node was already processed by transform_bottom_up,
3160                            // so its ident is on the stack
3161                            let inner_ident = ident_stack.pop().unwrap();
3162
3163                            let partition_ident = syn::Ident::new(
3164                                &format!("stream_{}_partition", *next_stmt_id),
3165                                Span::call_site(),
3166                            );
3167                            let true_ident = syn::Ident::new(
3168                                &format!("stream_{}_true", *next_stmt_id),
3169                                Span::call_site(),
3170                            );
3171                            let false_ident = syn::Ident::new(
3172                                &format!("stream_{}_false", *next_stmt_id),
3173                                Span::call_site(),
3174                            );
3175
3176                            built_tees.insert(
3177                                ptr,
3178                                vec![true_ident.clone(), false_ident.clone()],
3179                            );
3180
3181                            match builders_or_callback {
3182                                BuildersOrCallback::Builders(graph_builders) => {
3183                                    let builder = graph_builders.get_dfir_mut(&out_location);
3184                                    builder.add_dfir(
3185                                        parse_quote! {
3186                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
3187                                            #true_ident = #partition_ident[0];
3188                                            #false_ident = #partition_ident[1];
3189                                        },
3190                                        None,
3191                                        Some(&next_stmt_id.to_string()),
3192                                    );
3193                                }
3194                                BuildersOrCallback::Callback(_, node_callback) => {
3195                                    node_callback(node, next_stmt_id);
3196                                }
3197                            }
3198
3199                            if is_true { true_ident } else { false_ident }
3200                        };
3201
3202                        *next_stmt_id += 1;
3203                        ident_stack.push(ret_ident);
3204                    }
3205
3206                    HydroNode::Chain { .. } => {
3207                        // Children are processed left-to-right, so second is on top
3208                        let second_ident = ident_stack.pop().unwrap();
3209                        let first_ident = ident_stack.pop().unwrap();
3210
3211                        let chain_ident =
3212                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3213
3214                        match builders_or_callback {
3215                            BuildersOrCallback::Builders(graph_builders) => {
3216                                let builder = graph_builders.get_dfir_mut(&out_location);
3217                                builder.add_dfir(
3218                                    parse_quote! {
3219                                        #chain_ident = chain();
3220                                        #first_ident -> [0]#chain_ident;
3221                                        #second_ident -> [1]#chain_ident;
3222                                    },
3223                                    None,
3224                                    Some(&next_stmt_id.to_string()),
3225                                );
3226                            }
3227                            BuildersOrCallback::Callback(_, node_callback) => {
3228                                node_callback(node, next_stmt_id);
3229                            }
3230                        }
3231
3232                        *next_stmt_id += 1;
3233
3234                        ident_stack.push(chain_ident);
3235                    }
3236
3237                    HydroNode::MergeOrdered { first, metadata, .. } => {
3238                        let second_ident = ident_stack.pop().unwrap();
3239                        let first_ident = ident_stack.pop().unwrap();
3240
3241                        let merge_ident =
3242                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3243
3244                        match builders_or_callback {
3245                            BuildersOrCallback::Builders(graph_builders) => {
3246                                graph_builders.merge_ordered(
3247                                    &first.metadata().location_id,
3248                                    first_ident,
3249                                    second_ident,
3250                                    &merge_ident,
3251                                    &first.metadata().collection_kind,
3252                                    &metadata.op,
3253                                    Some(&next_stmt_id.to_string()),
3254                                );
3255                            }
3256                            BuildersOrCallback::Callback(_, node_callback) => {
3257                                node_callback(node, next_stmt_id);
3258                            }
3259                        }
3260
3261                        *next_stmt_id += 1;
3262
3263                        ident_stack.push(merge_ident);
3264                    }
3265
3266                    HydroNode::ChainFirst { .. } => {
3267                        let second_ident = ident_stack.pop().unwrap();
3268                        let first_ident = ident_stack.pop().unwrap();
3269
3270                        let chain_ident =
3271                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3272
3273                        match builders_or_callback {
3274                            BuildersOrCallback::Builders(graph_builders) => {
3275                                let builder = graph_builders.get_dfir_mut(&out_location);
3276                                builder.add_dfir(
3277                                    parse_quote! {
3278                                        #chain_ident = chain_first_n(1);
3279                                        #first_ident -> [0]#chain_ident;
3280                                        #second_ident -> [1]#chain_ident;
3281                                    },
3282                                    None,
3283                                    Some(&next_stmt_id.to_string()),
3284                                );
3285                            }
3286                            BuildersOrCallback::Callback(_, node_callback) => {
3287                                node_callback(node, next_stmt_id);
3288                            }
3289                        }
3290
3291                        *next_stmt_id += 1;
3292
3293                        ident_stack.push(chain_ident);
3294                    }
3295
3296                    HydroNode::CrossSingleton { right, .. } => {
3297                        let right_ident = ident_stack.pop().unwrap();
3298                        let left_ident = ident_stack.pop().unwrap();
3299
3300                        let cross_ident =
3301                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3302
3303                        match builders_or_callback {
3304                            BuildersOrCallback::Builders(graph_builders) => {
3305                                let builder = graph_builders.get_dfir_mut(&out_location);
3306
3307                                if right.metadata().location_id.is_top_level()
3308                                    && right.metadata().collection_kind.is_bounded()
3309                                {
3310                                    builder.add_dfir(
3311                                        parse_quote! {
3312                                            #cross_ident = cross_singleton();
3313                                            #left_ident -> [input]#cross_ident;
3314                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
3315                                        },
3316                                        None,
3317                                        Some(&next_stmt_id.to_string()),
3318                                    );
3319                                } else {
3320                                    builder.add_dfir(
3321                                        parse_quote! {
3322                                            #cross_ident = cross_singleton();
3323                                            #left_ident -> [input]#cross_ident;
3324                                            #right_ident -> [single]#cross_ident;
3325                                        },
3326                                        None,
3327                                        Some(&next_stmt_id.to_string()),
3328                                    );
3329                                }
3330                            }
3331                            BuildersOrCallback::Callback(_, node_callback) => {
3332                                node_callback(node, next_stmt_id);
3333                            }
3334                        }
3335
3336                        *next_stmt_id += 1;
3337
3338                        ident_stack.push(cross_ident);
3339                    }
3340
3341                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3342                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3343                            parse_quote!(cross_join_multiset)
3344                        } else {
3345                            parse_quote!(join_multiset)
3346                        };
3347
3348                        let (HydroNode::CrossProduct { left, right, .. }
3349                        | HydroNode::Join { left, right, .. }) = node
3350                        else {
3351                            unreachable!()
3352                        };
3353
3354                        let is_top_level = left.metadata().location_id.is_top_level()
3355                            && right.metadata().location_id.is_top_level();
3356                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3357                            quote!('static)
3358                        } else {
3359                            quote!('tick)
3360                        };
3361
3362                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3363                            quote!('static)
3364                        } else {
3365                            quote!('tick)
3366                        };
3367
3368                        let right_ident = ident_stack.pop().unwrap();
3369                        let left_ident = ident_stack.pop().unwrap();
3370
3371                        let stream_ident =
3372                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3373
3374                        match builders_or_callback {
3375                            BuildersOrCallback::Builders(graph_builders) => {
3376                                let builder = graph_builders.get_dfir_mut(&out_location);
3377                                builder.add_dfir(
3378                                    if is_top_level {
3379                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
3380                                        // a multiset_delta() to negate the replay behavior
3381                                        parse_quote! {
3382                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3383                                            #left_ident -> [0]#stream_ident;
3384                                            #right_ident -> [1]#stream_ident;
3385                                        }
3386                                    } else {
3387                                        parse_quote! {
3388                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3389                                            #left_ident -> [0]#stream_ident;
3390                                            #right_ident -> [1]#stream_ident;
3391                                        }
3392                                    }
3393                                    ,
3394                                    None,
3395                                    Some(&next_stmt_id.to_string()),
3396                                );
3397                            }
3398                            BuildersOrCallback::Callback(_, node_callback) => {
3399                                node_callback(node, next_stmt_id);
3400                            }
3401                        }
3402
3403                        *next_stmt_id += 1;
3404
3405                        ident_stack.push(stream_ident);
3406                    }
3407
3408                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3409                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3410                            parse_quote!(difference)
3411                        } else {
3412                            parse_quote!(anti_join)
3413                        };
3414
3415                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3416                            node
3417                        else {
3418                            unreachable!()
3419                        };
3420
3421                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3422                            quote!('static)
3423                        } else {
3424                            quote!('tick)
3425                        };
3426
3427                        let neg_ident = ident_stack.pop().unwrap();
3428                        let pos_ident = ident_stack.pop().unwrap();
3429
3430                        let stream_ident =
3431                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3432
3433                        match builders_or_callback {
3434                            BuildersOrCallback::Builders(graph_builders) => {
3435                                let builder = graph_builders.get_dfir_mut(&out_location);
3436                                builder.add_dfir(
3437                                    parse_quote! {
3438                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
3439                                        #pos_ident -> [pos]#stream_ident;
3440                                        #neg_ident -> [neg]#stream_ident;
3441                                    },
3442                                    None,
3443                                    Some(&next_stmt_id.to_string()),
3444                                );
3445                            }
3446                            BuildersOrCallback::Callback(_, node_callback) => {
3447                                node_callback(node, next_stmt_id);
3448                            }
3449                        }
3450
3451                        *next_stmt_id += 1;
3452
3453                        ident_stack.push(stream_ident);
3454                    }
3455
3456                    HydroNode::JoinHalf { .. } => {
3457                        let HydroNode::JoinHalf { right, .. } = node else {
3458                            unreachable!()
3459                        };
3460
3461                        assert!(
3462                            right.metadata().collection_kind.is_bounded(),
3463                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3464                            right.metadata().collection_kind
3465                        );
3466
3467                        let build_lifetime = if right.metadata().location_id.is_top_level() {
3468                            quote!('static)
3469                        } else {
3470                            quote!('tick)
3471                        };
3472
3473                        let build_ident = ident_stack.pop().unwrap();
3474                        let probe_ident = ident_stack.pop().unwrap();
3475
3476                        let stream_ident =
3477                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3478
3479                        match builders_or_callback {
3480                            BuildersOrCallback::Builders(graph_builders) => {
3481                                let builder = graph_builders.get_dfir_mut(&out_location);
3482                                builder.add_dfir(
3483                                    parse_quote! {
3484                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3485                                        #probe_ident -> [probe]#stream_ident;
3486                                        #build_ident -> [build]#stream_ident;
3487                                    },
3488                                    None,
3489                                    Some(&next_stmt_id.to_string()),
3490                                );
3491                            }
3492                            BuildersOrCallback::Callback(_, node_callback) => {
3493                                node_callback(node, next_stmt_id);
3494                            }
3495                        }
3496
3497                        *next_stmt_id += 1;
3498
3499                        ident_stack.push(stream_ident);
3500                    }
3501
3502                    HydroNode::ResolveFutures { .. } => {
3503                        let input_ident = ident_stack.pop().unwrap();
3504
3505                        let futures_ident =
3506                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3507
3508                        match builders_or_callback {
3509                            BuildersOrCallback::Builders(graph_builders) => {
3510                                let builder = graph_builders.get_dfir_mut(&out_location);
3511                                builder.add_dfir(
3512                                    parse_quote! {
3513                                        #futures_ident = #input_ident -> resolve_futures();
3514                                    },
3515                                    None,
3516                                    Some(&next_stmt_id.to_string()),
3517                                );
3518                            }
3519                            BuildersOrCallback::Callback(_, node_callback) => {
3520                                node_callback(node, next_stmt_id);
3521                            }
3522                        }
3523
3524                        *next_stmt_id += 1;
3525
3526                        ident_stack.push(futures_ident);
3527                    }
3528
3529                    HydroNode::ResolveFuturesBlocking { .. } => {
3530                        let input_ident = ident_stack.pop().unwrap();
3531
3532                        let futures_ident =
3533                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3534
3535                        match builders_or_callback {
3536                            BuildersOrCallback::Builders(graph_builders) => {
3537                                let builder = graph_builders.get_dfir_mut(&out_location);
3538                                builder.add_dfir(
3539                                    parse_quote! {
3540                                        #futures_ident = #input_ident -> resolve_futures_blocking();
3541                                    },
3542                                    None,
3543                                    Some(&next_stmt_id.to_string()),
3544                                );
3545                            }
3546                            BuildersOrCallback::Callback(_, node_callback) => {
3547                                node_callback(node, next_stmt_id);
3548                            }
3549                        }
3550
3551                        *next_stmt_id += 1;
3552
3553                        ident_stack.push(futures_ident);
3554                    }
3555
3556                    HydroNode::ResolveFuturesOrdered { .. } => {
3557                        let input_ident = ident_stack.pop().unwrap();
3558
3559                        let futures_ident =
3560                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3561
3562                        match builders_or_callback {
3563                            BuildersOrCallback::Builders(graph_builders) => {
3564                                let builder = graph_builders.get_dfir_mut(&out_location);
3565                                builder.add_dfir(
3566                                    parse_quote! {
3567                                        #futures_ident = #input_ident -> resolve_futures_ordered();
3568                                    },
3569                                    None,
3570                                    Some(&next_stmt_id.to_string()),
3571                                );
3572                            }
3573                            BuildersOrCallback::Callback(_, node_callback) => {
3574                                node_callback(node, next_stmt_id);
3575                            }
3576                        }
3577
3578                        *next_stmt_id += 1;
3579
3580                        ident_stack.push(futures_ident);
3581                    }
3582
3583                    HydroNode::Map { f, .. } => {
3584                        let input_ident = ident_stack.pop().unwrap();
3585
3586                        let map_ident =
3587                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3588
3589                        match builders_or_callback {
3590                            BuildersOrCallback::Builders(graph_builders) => {
3591                                let builder = graph_builders.get_dfir_mut(&out_location);
3592                                builder.add_dfir(
3593                                    parse_quote! {
3594                                        #map_ident = #input_ident -> map(#f);
3595                                    },
3596                                    None,
3597                                    Some(&next_stmt_id.to_string()),
3598                                );
3599                            }
3600                            BuildersOrCallback::Callback(_, node_callback) => {
3601                                node_callback(node, next_stmt_id);
3602                            }
3603                        }
3604
3605                        *next_stmt_id += 1;
3606
3607                        ident_stack.push(map_ident);
3608                    }
3609
3610                    HydroNode::FlatMap { f, .. } => {
3611                        let input_ident = ident_stack.pop().unwrap();
3612
3613                        let flat_map_ident =
3614                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3615
3616                        match builders_or_callback {
3617                            BuildersOrCallback::Builders(graph_builders) => {
3618                                let builder = graph_builders.get_dfir_mut(&out_location);
3619                                builder.add_dfir(
3620                                    parse_quote! {
3621                                        #flat_map_ident = #input_ident -> flat_map(#f);
3622                                    },
3623                                    None,
3624                                    Some(&next_stmt_id.to_string()),
3625                                );
3626                            }
3627                            BuildersOrCallback::Callback(_, node_callback) => {
3628                                node_callback(node, next_stmt_id);
3629                            }
3630                        }
3631
3632                        *next_stmt_id += 1;
3633
3634                        ident_stack.push(flat_map_ident);
3635                    }
3636
3637                    HydroNode::FlatMapStreamBlocking { f, .. } => {
3638                        let input_ident = ident_stack.pop().unwrap();
3639
3640                        let flat_map_stream_blocking_ident =
3641                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3642
3643                        match builders_or_callback {
3644                            BuildersOrCallback::Builders(graph_builders) => {
3645                                let builder = graph_builders.get_dfir_mut(&out_location);
3646                                builder.add_dfir(
3647                                    parse_quote! {
3648                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f);
3649                                    },
3650                                    None,
3651                                    Some(&next_stmt_id.to_string()),
3652                                );
3653                            }
3654                            BuildersOrCallback::Callback(_, node_callback) => {
3655                                node_callback(node, next_stmt_id);
3656                            }
3657                        }
3658
3659                        *next_stmt_id += 1;
3660
3661                        ident_stack.push(flat_map_stream_blocking_ident);
3662                    }
3663
3664                    HydroNode::Filter { f, .. } => {
3665                        let input_ident = ident_stack.pop().unwrap();
3666
3667                        let filter_ident =
3668                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3669
3670                        match builders_or_callback {
3671                            BuildersOrCallback::Builders(graph_builders) => {
3672                                let builder = graph_builders.get_dfir_mut(&out_location);
3673                                builder.add_dfir(
3674                                    parse_quote! {
3675                                        #filter_ident = #input_ident -> filter(#f);
3676                                    },
3677                                    None,
3678                                    Some(&next_stmt_id.to_string()),
3679                                );
3680                            }
3681                            BuildersOrCallback::Callback(_, node_callback) => {
3682                                node_callback(node, next_stmt_id);
3683                            }
3684                        }
3685
3686                        *next_stmt_id += 1;
3687
3688                        ident_stack.push(filter_ident);
3689                    }
3690
3691                    HydroNode::FilterMap { f, .. } => {
3692                        let input_ident = ident_stack.pop().unwrap();
3693
3694                        let filter_map_ident =
3695                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3696
3697                        match builders_or_callback {
3698                            BuildersOrCallback::Builders(graph_builders) => {
3699                                let builder = graph_builders.get_dfir_mut(&out_location);
3700                                builder.add_dfir(
3701                                    parse_quote! {
3702                                        #filter_map_ident = #input_ident -> filter_map(#f);
3703                                    },
3704                                    None,
3705                                    Some(&next_stmt_id.to_string()),
3706                                );
3707                            }
3708                            BuildersOrCallback::Callback(_, node_callback) => {
3709                                node_callback(node, next_stmt_id);
3710                            }
3711                        }
3712
3713                        *next_stmt_id += 1;
3714
3715                        ident_stack.push(filter_map_ident);
3716                    }
3717
3718                    HydroNode::Sort { .. } => {
3719                        let input_ident = ident_stack.pop().unwrap();
3720
3721                        let sort_ident =
3722                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3723
3724                        match builders_or_callback {
3725                            BuildersOrCallback::Builders(graph_builders) => {
3726                                let builder = graph_builders.get_dfir_mut(&out_location);
3727                                builder.add_dfir(
3728                                    parse_quote! {
3729                                        #sort_ident = #input_ident -> sort();
3730                                    },
3731                                    None,
3732                                    Some(&next_stmt_id.to_string()),
3733                                );
3734                            }
3735                            BuildersOrCallback::Callback(_, node_callback) => {
3736                                node_callback(node, next_stmt_id);
3737                            }
3738                        }
3739
3740                        *next_stmt_id += 1;
3741
3742                        ident_stack.push(sort_ident);
3743                    }
3744
3745                    HydroNode::DeferTick { .. } => {
3746                        let input_ident = ident_stack.pop().unwrap();
3747
3748                        let defer_tick_ident =
3749                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3750
3751                        match builders_or_callback {
3752                            BuildersOrCallback::Builders(graph_builders) => {
3753                                let builder = graph_builders.get_dfir_mut(&out_location);
3754                                builder.add_dfir(
3755                                    parse_quote! {
3756                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
3757                                    },
3758                                    None,
3759                                    Some(&next_stmt_id.to_string()),
3760                                );
3761                            }
3762                            BuildersOrCallback::Callback(_, node_callback) => {
3763                                node_callback(node, next_stmt_id);
3764                            }
3765                        }
3766
3767                        *next_stmt_id += 1;
3768
3769                        ident_stack.push(defer_tick_ident);
3770                    }
3771
3772                    HydroNode::Enumerate { input, .. } => {
3773                        let input_ident = ident_stack.pop().unwrap();
3774
3775                        let enumerate_ident =
3776                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3777
3778                        match builders_or_callback {
3779                            BuildersOrCallback::Builders(graph_builders) => {
3780                                let builder = graph_builders.get_dfir_mut(&out_location);
3781                                let lifetime = if input.metadata().location_id.is_top_level() {
3782                                    quote!('static)
3783                                } else {
3784                                    quote!('tick)
3785                                };
3786                                builder.add_dfir(
3787                                    parse_quote! {
3788                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3789                                    },
3790                                    None,
3791                                    Some(&next_stmt_id.to_string()),
3792                                );
3793                            }
3794                            BuildersOrCallback::Callback(_, node_callback) => {
3795                                node_callback(node, next_stmt_id);
3796                            }
3797                        }
3798
3799                        *next_stmt_id += 1;
3800
3801                        ident_stack.push(enumerate_ident);
3802                    }
3803
3804                    HydroNode::Inspect { f, .. } => {
3805                        let input_ident = ident_stack.pop().unwrap();
3806
3807                        let inspect_ident =
3808                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3809
3810                        match builders_or_callback {
3811                            BuildersOrCallback::Builders(graph_builders) => {
3812                                let builder = graph_builders.get_dfir_mut(&out_location);
3813                                builder.add_dfir(
3814                                    parse_quote! {
3815                                        #inspect_ident = #input_ident -> inspect(#f);
3816                                    },
3817                                    None,
3818                                    Some(&next_stmt_id.to_string()),
3819                                );
3820                            }
3821                            BuildersOrCallback::Callback(_, node_callback) => {
3822                                node_callback(node, next_stmt_id);
3823                            }
3824                        }
3825
3826                        *next_stmt_id += 1;
3827
3828                        ident_stack.push(inspect_ident);
3829                    }
3830
3831                    HydroNode::Unique { input, .. } => {
3832                        let input_ident = ident_stack.pop().unwrap();
3833
3834                        let unique_ident =
3835                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3836
3837                        match builders_or_callback {
3838                            BuildersOrCallback::Builders(graph_builders) => {
3839                                let builder = graph_builders.get_dfir_mut(&out_location);
3840                                let lifetime = if input.metadata().location_id.is_top_level() {
3841                                    quote!('static)
3842                                } else {
3843                                    quote!('tick)
3844                                };
3845
3846                                builder.add_dfir(
3847                                    parse_quote! {
3848                                        #unique_ident = #input_ident -> unique::<#lifetime>();
3849                                    },
3850                                    None,
3851                                    Some(&next_stmt_id.to_string()),
3852                                );
3853                            }
3854                            BuildersOrCallback::Callback(_, node_callback) => {
3855                                node_callback(node, next_stmt_id);
3856                            }
3857                        }
3858
3859                        *next_stmt_id += 1;
3860
3861                        ident_stack.push(unique_ident);
3862                    }
3863
3864                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
3865                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3866                            if input.metadata().location_id.is_top_level()
3867                                && input.metadata().collection_kind.is_bounded()
3868                            {
3869                                parse_quote!(fold_no_replay)
3870                            } else {
3871                                parse_quote!(fold)
3872                            }
3873                        } else if matches!(node, HydroNode::Scan { .. }) {
3874                            parse_quote!(scan)
3875                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
3876                            parse_quote!(scan_async_blocking)
3877                        } else if let HydroNode::FoldKeyed { input, .. } = node {
3878                            if input.metadata().location_id.is_top_level()
3879                                && input.metadata().collection_kind.is_bounded()
3880                            {
3881                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
3882                            } else {
3883                                parse_quote!(fold_keyed)
3884                            }
3885                        } else {
3886                            unreachable!()
3887                        };
3888
3889                        let (HydroNode::Fold { input, .. }
3890                        | HydroNode::FoldKeyed { input, .. }
3891                        | HydroNode::Scan { input, .. }
3892                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
3893                        else {
3894                            unreachable!()
3895                        };
3896
3897                        let lifetime = if input.metadata().location_id.is_top_level() {
3898                            quote!('static)
3899                        } else {
3900                            quote!('tick)
3901                        };
3902
3903                        let input_ident = ident_stack.pop().unwrap();
3904
3905                        let (HydroNode::Fold { init, acc, .. }
3906                        | HydroNode::FoldKeyed { init, acc, .. }
3907                        | HydroNode::Scan { init, acc, .. }
3908                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
3909                        else {
3910                            unreachable!()
3911                        };
3912
3913                        let fold_ident =
3914                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3915
3916                        match builders_or_callback {
3917                            BuildersOrCallback::Builders(graph_builders) => {
3918                                if matches!(node, HydroNode::Fold { .. })
3919                                    && node.metadata().location_id.is_top_level()
3920                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3921                                    && graph_builders.singleton_intermediates()
3922                                    && !node.metadata().collection_kind.is_bounded()
3923                                {
3924                                    let builder = graph_builders.get_dfir_mut(&out_location);
3925
3926                                    let acc: syn::Expr = parse_quote!({
3927                                        let mut __inner = #acc;
3928                                        move |__state, __value| {
3929                                            __inner(__state, __value);
3930                                            Some(__state.clone())
3931                                        }
3932                                    });
3933
3934                                    builder.add_dfir(
3935                                        parse_quote! {
3936                                            source_iter([(#init)()]) -> [0]#fold_ident;
3937                                            #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3938                                            #fold_ident = chain();
3939                                        },
3940                                        None,
3941                                        Some(&next_stmt_id.to_string()),
3942                                    );
3943                                } else if matches!(node, HydroNode::FoldKeyed { .. })
3944                                    && node.metadata().location_id.is_top_level()
3945                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3946                                    && graph_builders.singleton_intermediates()
3947                                    && !node.metadata().collection_kind.is_bounded()
3948                                {
3949                                    let builder = graph_builders.get_dfir_mut(&out_location);
3950
3951                                    let acc: syn::Expr = parse_quote!({
3952                                        let mut __init = #init;
3953                                        let mut __inner = #acc;
3954                                        move |__state, __kv: (_, _)| {
3955                                            // TODO(shadaj): we can avoid the clone when the entry exists
3956                                            let __state = __state
3957                                                .entry(::std::clone::Clone::clone(&__kv.0))
3958                                                .or_insert_with(|| (__init)());
3959                                            __inner(__state, __kv.1);
3960                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3961                                        }
3962                                    });
3963
3964                                    builder.add_dfir(
3965                                        parse_quote! {
3966                                            #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3967                                        },
3968                                        None,
3969                                        Some(&next_stmt_id.to_string()),
3970                                    );
3971                                } else {
3972                                    let builder = graph_builders.get_dfir_mut(&out_location);
3973                                    builder.add_dfir(
3974                                        parse_quote! {
3975                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3976                                        },
3977                                        None,
3978                                        Some(&next_stmt_id.to_string()),
3979                                    );
3980                                }
3981                            }
3982                            BuildersOrCallback::Callback(_, node_callback) => {
3983                                node_callback(node, next_stmt_id);
3984                            }
3985                        }
3986
3987                        *next_stmt_id += 1;
3988
3989                        ident_stack.push(fold_ident);
3990                    }
3991
3992                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3993                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3994                            if input.metadata().location_id.is_top_level()
3995                                && input.metadata().collection_kind.is_bounded()
3996                            {
3997                                parse_quote!(reduce_no_replay)
3998                            } else {
3999                                parse_quote!(reduce)
4000                            }
4001                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
4002                            if input.metadata().location_id.is_top_level()
4003                                && input.metadata().collection_kind.is_bounded()
4004                            {
4005                                todo!(
4006                                    "Calling keyed reduce on a top-level bounded collection is not supported"
4007                                )
4008                            } else {
4009                                parse_quote!(reduce_keyed)
4010                            }
4011                        } else {
4012                            unreachable!()
4013                        };
4014
4015                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4016                        else {
4017                            unreachable!()
4018                        };
4019
4020                        let lifetime = if input.metadata().location_id.is_top_level() {
4021                            quote!('static)
4022                        } else {
4023                            quote!('tick)
4024                        };
4025
4026                        let input_ident = ident_stack.pop().unwrap();
4027
4028                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4029                        else {
4030                            unreachable!()
4031                        };
4032
4033                        let reduce_ident =
4034                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4035
4036                        match builders_or_callback {
4037                            BuildersOrCallback::Builders(graph_builders) => {
4038                                if matches!(node, HydroNode::Reduce { .. })
4039                                    && node.metadata().location_id.is_top_level()
4040                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4041                                    && graph_builders.singleton_intermediates()
4042                                    && !node.metadata().collection_kind.is_bounded()
4043                                {
4044                                    todo!(
4045                                        "Reduce with optional intermediates is not yet supported in simulator"
4046                                    );
4047                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4048                                    && node.metadata().location_id.is_top_level()
4049                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4050                                    && graph_builders.singleton_intermediates()
4051                                    && !node.metadata().collection_kind.is_bounded()
4052                                {
4053                                    todo!(
4054                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4055                                    );
4056                                } else {
4057                                    let builder = graph_builders.get_dfir_mut(&out_location);
4058                                    builder.add_dfir(
4059                                        parse_quote! {
4060                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
4061                                        },
4062                                        None,
4063                                        Some(&next_stmt_id.to_string()),
4064                                    );
4065                                }
4066                            }
4067                            BuildersOrCallback::Callback(_, node_callback) => {
4068                                node_callback(node, next_stmt_id);
4069                            }
4070                        }
4071
4072                        *next_stmt_id += 1;
4073
4074                        ident_stack.push(reduce_ident);
4075                    }
4076
4077                    HydroNode::ReduceKeyedWatermark {
4078                        f,
4079                        input,
4080                        metadata,
4081                        ..
4082                    } => {
4083                        let lifetime = if input.metadata().location_id.is_top_level() {
4084                            quote!('static)
4085                        } else {
4086                            quote!('tick)
4087                        };
4088
4089                        // watermark is processed second, so it's on top
4090                        let watermark_ident = ident_stack.pop().unwrap();
4091                        let input_ident = ident_stack.pop().unwrap();
4092
4093                        let chain_ident = syn::Ident::new(
4094                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4095                            Span::call_site(),
4096                        );
4097
4098                        let fold_ident =
4099                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4100
4101                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4102                            && input.metadata().collection_kind.is_bounded()
4103                        {
4104                            parse_quote!(fold_no_replay)
4105                        } else {
4106                            parse_quote!(fold)
4107                        };
4108
4109                        match builders_or_callback {
4110                            BuildersOrCallback::Builders(graph_builders) => {
4111                                if metadata.location_id.is_top_level()
4112                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4113                                    && graph_builders.singleton_intermediates()
4114                                    && !metadata.collection_kind.is_bounded()
4115                                {
4116                                    todo!(
4117                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4118                                    )
4119                                } else {
4120                                    let builder = graph_builders.get_dfir_mut(&out_location);
4121                                    builder.add_dfir(
4122                                        parse_quote! {
4123                                            #chain_ident = chain();
4124                                            #input_ident
4125                                                -> map(|x| (Some(x), None))
4126                                                -> [0]#chain_ident;
4127                                            #watermark_ident
4128                                                -> map(|watermark| (None, Some(watermark)))
4129                                                -> [1]#chain_ident;
4130
4131                                            #fold_ident = #chain_ident
4132                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4133                                                    let __reduce_keyed_fn = #f;
4134                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4135                                                        if let Some((k, v)) = opt_payload {
4136                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4137                                                                if k < curr_watermark {
4138                                                                    return;
4139                                                                }
4140                                                            }
4141                                                            match map.entry(k) {
4142                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4143                                                                    e.insert(v);
4144                                                                }
4145                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4146                                                                    __reduce_keyed_fn(e.get_mut(), v);
4147                                                                }
4148                                                            }
4149                                                        } else {
4150                                                            let watermark = opt_watermark.unwrap();
4151                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4152                                                                if watermark <= curr_watermark {
4153                                                                    return;
4154                                                                }
4155                                                            }
4156                                                            map.retain(|k, _| *k >= watermark);
4157                                                            *opt_curr_watermark = Some(watermark);
4158                                                        }
4159                                                    }
4160                                                })
4161                                                -> flat_map(|(map, _curr_watermark)| map);
4162                                        },
4163                                        None,
4164                                        Some(&next_stmt_id.to_string()),
4165                                    );
4166                                }
4167                            }
4168                            BuildersOrCallback::Callback(_, node_callback) => {
4169                                node_callback(node, next_stmt_id);
4170                            }
4171                        }
4172
4173                        *next_stmt_id += 1;
4174
4175                        ident_stack.push(fold_ident);
4176                    }
4177
4178                    HydroNode::Network {
4179                        networking_info,
4180                        serialize_fn: serialize_pipeline,
4181                        instantiate_fn,
4182                        deserialize_fn: deserialize_pipeline,
4183                        input,
4184                        ..
4185                    } => {
4186                        let input_ident = ident_stack.pop().unwrap();
4187
4188                        let receiver_stream_ident =
4189                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4190
4191                        match builders_or_callback {
4192                            BuildersOrCallback::Builders(graph_builders) => {
4193                                let (sink_expr, source_expr) = match instantiate_fn {
4194                                    DebugInstantiate::Building => (
4195                                        syn::parse_quote!(DUMMY_SINK),
4196                                        syn::parse_quote!(DUMMY_SOURCE),
4197                                    ),
4198
4199                                    DebugInstantiate::Finalized(finalized) => {
4200                                        (finalized.sink.clone(), finalized.source.clone())
4201                                    }
4202                                };
4203
4204                                graph_builders.create_network(
4205                                    &input.metadata().location_id,
4206                                    &out_location,
4207                                    input_ident,
4208                                    &receiver_stream_ident,
4209                                    serialize_pipeline.as_ref(),
4210                                    sink_expr,
4211                                    source_expr,
4212                                    deserialize_pipeline.as_ref(),
4213                                    *next_stmt_id,
4214                                    networking_info,
4215                                );
4216                            }
4217                            BuildersOrCallback::Callback(_, node_callback) => {
4218                                node_callback(node, next_stmt_id);
4219                            }
4220                        }
4221
4222                        *next_stmt_id += 1;
4223
4224                        ident_stack.push(receiver_stream_ident);
4225                    }
4226
4227                    HydroNode::ExternalInput {
4228                        instantiate_fn,
4229                        deserialize_fn: deserialize_pipeline,
4230                        ..
4231                    } => {
4232                        let receiver_stream_ident =
4233                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4234
4235                        match builders_or_callback {
4236                            BuildersOrCallback::Builders(graph_builders) => {
4237                                let (_, source_expr) = match instantiate_fn {
4238                                    DebugInstantiate::Building => (
4239                                        syn::parse_quote!(DUMMY_SINK),
4240                                        syn::parse_quote!(DUMMY_SOURCE),
4241                                    ),
4242
4243                                    DebugInstantiate::Finalized(finalized) => {
4244                                        (finalized.sink.clone(), finalized.source.clone())
4245                                    }
4246                                };
4247
4248                                graph_builders.create_external_source(
4249                                    &out_location,
4250                                    source_expr,
4251                                    &receiver_stream_ident,
4252                                    deserialize_pipeline.as_ref(),
4253                                    *next_stmt_id,
4254                                );
4255                            }
4256                            BuildersOrCallback::Callback(_, node_callback) => {
4257                                node_callback(node, next_stmt_id);
4258                            }
4259                        }
4260
4261                        *next_stmt_id += 1;
4262
4263                        ident_stack.push(receiver_stream_ident);
4264                    }
4265
4266                    HydroNode::Counter {
4267                        tag,
4268                        duration,
4269                        prefix,
4270                        ..
4271                    } => {
4272                        let input_ident = ident_stack.pop().unwrap();
4273
4274                        let counter_ident =
4275                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4276
4277                        match builders_or_callback {
4278                            BuildersOrCallback::Builders(graph_builders) => {
4279                                let arg = format!("{}({})", prefix, tag);
4280                                let builder = graph_builders.get_dfir_mut(&out_location);
4281                                builder.add_dfir(
4282                                    parse_quote! {
4283                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
4284                                    },
4285                                    None,
4286                                    Some(&next_stmt_id.to_string()),
4287                                );
4288                            }
4289                            BuildersOrCallback::Callback(_, node_callback) => {
4290                                node_callback(node, next_stmt_id);
4291                            }
4292                        }
4293
4294                        *next_stmt_id += 1;
4295
4296                        ident_stack.push(counter_ident);
4297                    }
4298                }
4299            },
4300            seen_tees,
4301            false,
4302        );
4303
4304        ident_stack
4305            .pop()
4306            .expect("ident_stack should have exactly one element after traversal")
4307    }
4308
4309    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4310        match self {
4311            HydroNode::Placeholder => {
4312                panic!()
4313            }
4314            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
4315            HydroNode::Source { source, .. } => match source {
4316                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4317                HydroSource::ExternalNetwork()
4318                | HydroSource::Spin()
4319                | HydroSource::ClusterMembers(_, _)
4320                | HydroSource::Embedded(_)
4321                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
4322            },
4323            HydroNode::SingletonSource { value, .. } => {
4324                transform(value);
4325            }
4326            HydroNode::CycleSource { .. }
4327            | HydroNode::Tee { .. }
4328            | HydroNode::YieldConcat { .. }
4329            | HydroNode::BeginAtomic { .. }
4330            | HydroNode::EndAtomic { .. }
4331            | HydroNode::Batch { .. }
4332            | HydroNode::Chain { .. }
4333            | HydroNode::MergeOrdered { .. }
4334            | HydroNode::ChainFirst { .. }
4335            | HydroNode::CrossProduct { .. }
4336            | HydroNode::CrossSingleton { .. }
4337            | HydroNode::ResolveFutures { .. }
4338            | HydroNode::ResolveFuturesBlocking { .. }
4339            | HydroNode::ResolveFuturesOrdered { .. }
4340            | HydroNode::Join { .. }
4341            | HydroNode::JoinHalf { .. }
4342            | HydroNode::Difference { .. }
4343            | HydroNode::AntiJoin { .. }
4344            | HydroNode::DeferTick { .. }
4345            | HydroNode::Enumerate { .. }
4346            | HydroNode::Unique { .. }
4347            | HydroNode::Sort { .. } => {}
4348            HydroNode::Map { f, .. }
4349            | HydroNode::FlatMap { f, .. }
4350            | HydroNode::FlatMapStreamBlocking { f, .. }
4351            | HydroNode::Filter { f, .. }
4352            | HydroNode::FilterMap { f, .. }
4353            | HydroNode::Inspect { f, .. }
4354            | HydroNode::Partition { f, .. }
4355            | HydroNode::Reduce { f, .. }
4356            | HydroNode::ReduceKeyed { f, .. }
4357            | HydroNode::ReduceKeyedWatermark { f, .. } => {
4358                transform(f);
4359            }
4360            HydroNode::Fold { init, acc, .. }
4361            | HydroNode::Scan { init, acc, .. }
4362            | HydroNode::ScanAsyncBlocking { init, acc, .. }
4363            | HydroNode::FoldKeyed { init, acc, .. } => {
4364                transform(init);
4365                transform(acc);
4366            }
4367            HydroNode::Network {
4368                serialize_fn,
4369                deserialize_fn,
4370                ..
4371            } => {
4372                if let Some(serialize_fn) = serialize_fn {
4373                    transform(serialize_fn);
4374                }
4375                if let Some(deserialize_fn) = deserialize_fn {
4376                    transform(deserialize_fn);
4377                }
4378            }
4379            HydroNode::ExternalInput { deserialize_fn, .. } => {
4380                if let Some(deserialize_fn) = deserialize_fn {
4381                    transform(deserialize_fn);
4382                }
4383            }
4384            HydroNode::Counter { duration, .. } => {
4385                transform(duration);
4386            }
4387        }
4388    }
4389
4390    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4391        &self.metadata().op
4392    }
4393
4394    pub fn metadata(&self) -> &HydroIrMetadata {
4395        match self {
4396            HydroNode::Placeholder => {
4397                panic!()
4398            }
4399            HydroNode::Cast { metadata, .. } => metadata,
4400            HydroNode::ObserveNonDet { metadata, .. } => metadata,
4401            HydroNode::Source { metadata, .. } => metadata,
4402            HydroNode::SingletonSource { metadata, .. } => metadata,
4403            HydroNode::CycleSource { metadata, .. } => metadata,
4404            HydroNode::Tee { metadata, .. } => metadata,
4405            HydroNode::Partition { metadata, .. } => metadata,
4406            HydroNode::YieldConcat { metadata, .. } => metadata,
4407            HydroNode::BeginAtomic { metadata, .. } => metadata,
4408            HydroNode::EndAtomic { metadata, .. } => metadata,
4409            HydroNode::Batch { metadata, .. } => metadata,
4410            HydroNode::Chain { metadata, .. } => metadata,
4411            HydroNode::MergeOrdered { metadata, .. } => metadata,
4412            HydroNode::ChainFirst { metadata, .. } => metadata,
4413            HydroNode::CrossProduct { metadata, .. } => metadata,
4414            HydroNode::CrossSingleton { metadata, .. } => metadata,
4415            HydroNode::Join { metadata, .. } => metadata,
4416            HydroNode::JoinHalf { metadata, .. } => metadata,
4417            HydroNode::Difference { metadata, .. } => metadata,
4418            HydroNode::AntiJoin { metadata, .. } => metadata,
4419            HydroNode::ResolveFutures { metadata, .. } => metadata,
4420            HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4421            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4422            HydroNode::Map { metadata, .. } => metadata,
4423            HydroNode::FlatMap { metadata, .. } => metadata,
4424            HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4425            HydroNode::Filter { metadata, .. } => metadata,
4426            HydroNode::FilterMap { metadata, .. } => metadata,
4427            HydroNode::DeferTick { metadata, .. } => metadata,
4428            HydroNode::Enumerate { metadata, .. } => metadata,
4429            HydroNode::Inspect { metadata, .. } => metadata,
4430            HydroNode::Unique { metadata, .. } => metadata,
4431            HydroNode::Sort { metadata, .. } => metadata,
4432            HydroNode::Scan { metadata, .. } => metadata,
4433            HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4434            HydroNode::Fold { metadata, .. } => metadata,
4435            HydroNode::FoldKeyed { metadata, .. } => metadata,
4436            HydroNode::Reduce { metadata, .. } => metadata,
4437            HydroNode::ReduceKeyed { metadata, .. } => metadata,
4438            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4439            HydroNode::ExternalInput { metadata, .. } => metadata,
4440            HydroNode::Network { metadata, .. } => metadata,
4441            HydroNode::Counter { metadata, .. } => metadata,
4442        }
4443    }
4444
4445    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4446        &mut self.metadata_mut().op
4447    }
4448
4449    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4450        match self {
4451            HydroNode::Placeholder => {
4452                panic!()
4453            }
4454            HydroNode::Cast { metadata, .. } => metadata,
4455            HydroNode::ObserveNonDet { metadata, .. } => metadata,
4456            HydroNode::Source { metadata, .. } => metadata,
4457            HydroNode::SingletonSource { metadata, .. } => metadata,
4458            HydroNode::CycleSource { metadata, .. } => metadata,
4459            HydroNode::Tee { metadata, .. } => metadata,
4460            HydroNode::Partition { metadata, .. } => metadata,
4461            HydroNode::YieldConcat { metadata, .. } => metadata,
4462            HydroNode::BeginAtomic { metadata, .. } => metadata,
4463            HydroNode::EndAtomic { metadata, .. } => metadata,
4464            HydroNode::Batch { metadata, .. } => metadata,
4465            HydroNode::Chain { metadata, .. } => metadata,
4466            HydroNode::MergeOrdered { metadata, .. } => metadata,
4467            HydroNode::ChainFirst { metadata, .. } => metadata,
4468            HydroNode::CrossProduct { metadata, .. } => metadata,
4469            HydroNode::CrossSingleton { metadata, .. } => metadata,
4470            HydroNode::Join { metadata, .. } => metadata,
4471            HydroNode::JoinHalf { metadata, .. } => metadata,
4472            HydroNode::Difference { metadata, .. } => metadata,
4473            HydroNode::AntiJoin { metadata, .. } => metadata,
4474            HydroNode::ResolveFutures { metadata, .. } => metadata,
4475            HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4476            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4477            HydroNode::Map { metadata, .. } => metadata,
4478            HydroNode::FlatMap { metadata, .. } => metadata,
4479            HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4480            HydroNode::Filter { metadata, .. } => metadata,
4481            HydroNode::FilterMap { metadata, .. } => metadata,
4482            HydroNode::DeferTick { metadata, .. } => metadata,
4483            HydroNode::Enumerate { metadata, .. } => metadata,
4484            HydroNode::Inspect { metadata, .. } => metadata,
4485            HydroNode::Unique { metadata, .. } => metadata,
4486            HydroNode::Sort { metadata, .. } => metadata,
4487            HydroNode::Scan { metadata, .. } => metadata,
4488            HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4489            HydroNode::Fold { metadata, .. } => metadata,
4490            HydroNode::FoldKeyed { metadata, .. } => metadata,
4491            HydroNode::Reduce { metadata, .. } => metadata,
4492            HydroNode::ReduceKeyed { metadata, .. } => metadata,
4493            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4494            HydroNode::ExternalInput { metadata, .. } => metadata,
4495            HydroNode::Network { metadata, .. } => metadata,
4496            HydroNode::Counter { metadata, .. } => metadata,
4497        }
4498    }
4499
4500    pub fn input(&self) -> Vec<&HydroNode> {
4501        match self {
4502            HydroNode::Placeholder => {
4503                panic!()
4504            }
4505            HydroNode::Source { .. }
4506            | HydroNode::SingletonSource { .. }
4507            | HydroNode::ExternalInput { .. }
4508            | HydroNode::CycleSource { .. }
4509            | HydroNode::Tee { .. }
4510            | HydroNode::Partition { .. } => {
4511                // Tee/Partition should find their input in separate special ways
4512                vec![]
4513            }
4514            HydroNode::Cast { inner, .. }
4515            | HydroNode::ObserveNonDet { inner, .. }
4516            | HydroNode::YieldConcat { inner, .. }
4517            | HydroNode::BeginAtomic { inner, .. }
4518            | HydroNode::EndAtomic { inner, .. }
4519            | HydroNode::Batch { inner, .. } => {
4520                vec![inner]
4521            }
4522            HydroNode::Chain { first, second, .. } => {
4523                vec![first, second]
4524            }
4525            HydroNode::MergeOrdered { first, second, .. } => {
4526                vec![first, second]
4527            }
4528            HydroNode::ChainFirst { first, second, .. } => {
4529                vec![first, second]
4530            }
4531            HydroNode::CrossProduct { left, right, .. }
4532            | HydroNode::CrossSingleton { left, right, .. }
4533            | HydroNode::Join { left, right, .. }
4534            | HydroNode::JoinHalf { left, right, .. } => {
4535                vec![left, right]
4536            }
4537            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4538                vec![pos, neg]
4539            }
4540            HydroNode::Map { input, .. }
4541            | HydroNode::FlatMap { input, .. }
4542            | HydroNode::FlatMapStreamBlocking { input, .. }
4543            | HydroNode::Filter { input, .. }
4544            | HydroNode::FilterMap { input, .. }
4545            | HydroNode::Sort { input, .. }
4546            | HydroNode::DeferTick { input, .. }
4547            | HydroNode::Enumerate { input, .. }
4548            | HydroNode::Inspect { input, .. }
4549            | HydroNode::Unique { input, .. }
4550            | HydroNode::Network { input, .. }
4551            | HydroNode::Counter { input, .. }
4552            | HydroNode::ResolveFutures { input, .. }
4553            | HydroNode::ResolveFuturesBlocking { input, .. }
4554            | HydroNode::ResolveFuturesOrdered { input, .. }
4555            | HydroNode::Fold { input, .. }
4556            | HydroNode::FoldKeyed { input, .. }
4557            | HydroNode::Reduce { input, .. }
4558            | HydroNode::ReduceKeyed { input, .. }
4559            | HydroNode::Scan { input, .. }
4560            | HydroNode::ScanAsyncBlocking { input, .. } => {
4561                vec![input]
4562            }
4563            HydroNode::ReduceKeyedWatermark {
4564                input, watermark, ..
4565            } => {
4566                vec![input, watermark]
4567            }
4568        }
4569    }
4570
4571    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4572        self.input()
4573            .iter()
4574            .map(|input_node| input_node.metadata())
4575            .collect()
4576    }
4577
4578    /// Returns `true` if this node is a Tee or Partition whose inner Rc
4579    /// has other live references, meaning the upstream is already driven
4580    /// by another consumer and does not need a Null sink.
4581    pub fn is_shared_with_others(&self) -> bool {
4582        match self {
4583            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4584                Rc::strong_count(&inner.0) > 1
4585            }
4586            _ => false,
4587        }
4588    }
4589
4590    pub fn print_root(&self) -> String {
4591        match self {
4592            HydroNode::Placeholder => {
4593                panic!()
4594            }
4595            HydroNode::Cast { .. } => "Cast()".to_owned(),
4596            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4597            HydroNode::Source { source, .. } => format!("Source({:?})", source),
4598            HydroNode::SingletonSource {
4599                value,
4600                first_tick_only,
4601                ..
4602            } => format!(
4603                "SingletonSource({:?}, first_tick_only={})",
4604                value, first_tick_only
4605            ),
4606            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4607            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
4608            HydroNode::Partition { f, is_true, .. } => {
4609                format!("Partition({:?}, is_true={})", f, is_true)
4610            }
4611            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4612            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4613            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4614            HydroNode::Batch { .. } => "Batch()".to_owned(),
4615            HydroNode::Chain { first, second, .. } => {
4616                format!("Chain({}, {})", first.print_root(), second.print_root())
4617            }
4618            HydroNode::MergeOrdered { first, second, .. } => {
4619                format!(
4620                    "MergeOrdered({}, {})",
4621                    first.print_root(),
4622                    second.print_root()
4623                )
4624            }
4625            HydroNode::ChainFirst { first, second, .. } => {
4626                format!(
4627                    "ChainFirst({}, {})",
4628                    first.print_root(),
4629                    second.print_root()
4630                )
4631            }
4632            HydroNode::CrossProduct { left, right, .. } => {
4633                format!(
4634                    "CrossProduct({}, {})",
4635                    left.print_root(),
4636                    right.print_root()
4637                )
4638            }
4639            HydroNode::CrossSingleton { left, right, .. } => {
4640                format!(
4641                    "CrossSingleton({}, {})",
4642                    left.print_root(),
4643                    right.print_root()
4644                )
4645            }
4646            HydroNode::Join { left, right, .. } => {
4647                format!("Join({}, {})", left.print_root(), right.print_root())
4648            }
4649            HydroNode::JoinHalf { left, right, .. } => {
4650                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
4651            }
4652            HydroNode::Difference { pos, neg, .. } => {
4653                format!("Difference({}, {})", pos.print_root(), neg.print_root())
4654            }
4655            HydroNode::AntiJoin { pos, neg, .. } => {
4656                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4657            }
4658            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4659            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
4660            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4661            HydroNode::Map { f, .. } => format!("Map({:?})", f),
4662            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4663            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
4664            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4665            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4666            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4667            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4668            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4669            HydroNode::Unique { .. } => "Unique()".to_owned(),
4670            HydroNode::Sort { .. } => "Sort()".to_owned(),
4671            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4672            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4673            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
4674                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
4675            }
4676            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4677            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4678            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4679            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4680            HydroNode::Network { .. } => "Network()".to_owned(),
4681            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4682            HydroNode::Counter { tag, duration, .. } => {
4683                format!("Counter({:?}, {:?})", tag, duration)
4684            }
4685        }
4686    }
4687}
4688
4689#[cfg(feature = "build")]
4690fn instantiate_network<'a, D>(
4691    env: &mut D::InstantiateEnv,
4692    from_location: &LocationId,
4693    to_location: &LocationId,
4694    processes: &SparseSecondaryMap<LocationKey, D::Process>,
4695    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4696    name: Option<&str>,
4697    networking_info: &crate::networking::NetworkingInfo,
4698) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4699where
4700    D: Deploy<'a>,
4701{
4702    let ((sink, source), connect_fn) = match (from_location, to_location) {
4703        (&LocationId::Process(from), &LocationId::Process(to)) => {
4704            let from_node = processes
4705                .get(from)
4706                .unwrap_or_else(|| {
4707                    panic!("A process used in the graph was not instantiated: {}", from)
4708                })
4709                .clone();
4710            let to_node = processes
4711                .get(to)
4712                .unwrap_or_else(|| {
4713                    panic!("A process used in the graph was not instantiated: {}", to)
4714                })
4715                .clone();
4716
4717            let sink_port = from_node.next_port();
4718            let source_port = to_node.next_port();
4719
4720            (
4721                D::o2o_sink_source(
4722                    env,
4723                    &from_node,
4724                    &sink_port,
4725                    &to_node,
4726                    &source_port,
4727                    name,
4728                    networking_info,
4729                ),
4730                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4731            )
4732        }
4733        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
4734            let from_node = processes
4735                .get(from)
4736                .unwrap_or_else(|| {
4737                    panic!("A process used in the graph was not instantiated: {}", from)
4738                })
4739                .clone();
4740            let to_node = clusters
4741                .get(to)
4742                .unwrap_or_else(|| {
4743                    panic!("A cluster used in the graph was not instantiated: {}", to)
4744                })
4745                .clone();
4746
4747            let sink_port = from_node.next_port();
4748            let source_port = to_node.next_port();
4749
4750            (
4751                D::o2m_sink_source(
4752                    env,
4753                    &from_node,
4754                    &sink_port,
4755                    &to_node,
4756                    &source_port,
4757                    name,
4758                    networking_info,
4759                ),
4760                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4761            )
4762        }
4763        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4764            let from_node = clusters
4765                .get(from)
4766                .unwrap_or_else(|| {
4767                    panic!("A cluster used in the graph was not instantiated: {}", from)
4768                })
4769                .clone();
4770            let to_node = processes
4771                .get(to)
4772                .unwrap_or_else(|| {
4773                    panic!("A process used in the graph was not instantiated: {}", to)
4774                })
4775                .clone();
4776
4777            let sink_port = from_node.next_port();
4778            let source_port = to_node.next_port();
4779
4780            (
4781                D::m2o_sink_source(
4782                    env,
4783                    &from_node,
4784                    &sink_port,
4785                    &to_node,
4786                    &source_port,
4787                    name,
4788                    networking_info,
4789                ),
4790                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4791            )
4792        }
4793        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4794            let from_node = clusters
4795                .get(from)
4796                .unwrap_or_else(|| {
4797                    panic!("A cluster used in the graph was not instantiated: {}", from)
4798                })
4799                .clone();
4800            let to_node = clusters
4801                .get(to)
4802                .unwrap_or_else(|| {
4803                    panic!("A cluster used in the graph was not instantiated: {}", to)
4804                })
4805                .clone();
4806
4807            let sink_port = from_node.next_port();
4808            let source_port = to_node.next_port();
4809
4810            (
4811                D::m2m_sink_source(
4812                    env,
4813                    &from_node,
4814                    &sink_port,
4815                    &to_node,
4816                    &source_port,
4817                    name,
4818                    networking_info,
4819                ),
4820                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4821            )
4822        }
4823        (LocationId::Tick(_, _), _) => panic!(),
4824        (_, LocationId::Tick(_, _)) => panic!(),
4825        (LocationId::Atomic(_), _) => panic!(),
4826        (_, LocationId::Atomic(_)) => panic!(),
4827    };
4828    (sink, source, connect_fn)
4829}
4830
4831#[cfg(test)]
4832mod serde_test;
4833
4834#[cfg(test)]
4835mod test {
4836    use std::mem::size_of;
4837
4838    use stageleft::{QuotedWithContext, q};
4839
4840    use super::*;
4841
4842    #[test]
4843    #[cfg_attr(
4844        not(feature = "build"),
4845        ignore = "expects inclusion of feature-gated fields"
4846    )]
4847    fn hydro_node_size() {
4848        assert_eq!(size_of::<HydroNode>(), 248);
4849    }
4850
4851    #[test]
4852    #[cfg_attr(
4853        not(feature = "build"),
4854        ignore = "expects inclusion of feature-gated fields"
4855    )]
4856    fn hydro_root_size() {
4857        assert_eq!(size_of::<HydroRoot>(), 136);
4858    }
4859
4860    #[test]
4861    fn test_simplify_q_macro_basic() {
4862        // Test basic non-q! expression
4863        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4864        let result = simplify_q_macro(simple_expr.clone());
4865        assert_eq!(result, simple_expr);
4866    }
4867
4868    #[test]
4869    fn test_simplify_q_macro_actual_stageleft_call() {
4870        // Test a simplified version of what a real stageleft call might look like
4871        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4872        let result = simplify_q_macro(stageleft_call);
4873        // This should be processed by our visitor and simplified to q!(...)
4874        // since we detect the stageleft::runtime_support::fn_* pattern
4875        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4876    }
4877
4878    #[test]
4879    fn test_closure_no_pipe_at_start() {
4880        // Test a closure that does not start with a pipe
4881        let stageleft_call = q!({
4882            let foo = 123;
4883            move |b: usize| b + foo
4884        })
4885        .splice_fn1_ctx(&());
4886        let result = simplify_q_macro(stageleft_call);
4887        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4888    }
4889}