1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
//! Randomized placement strategy.

use std::collections::BTreeMap;

use dslab_core::Id;
use dslab_dfs::dfs::DistributedFileSystem;
use dslab_network::Network;
use rand::{Rng, SeedableRng};
use rand_pcg::Pcg64;

use crate::{
    compute_host::ComputeHost,
    dag::{Dag, Stage},
    data_item::DataItem,
    placement_strategy::{PlacementStrategy, TaskPlacement},
};

use super::common::{collect_all_input, shuffle};

/// Randomized placement strategy.
///
/// Just places each task on a random host.
/// Inputs are also distributed randomly but evenly between tasks.
pub struct RandomPlacementStrategy {
    rng: Pcg64,
}

impl RandomPlacementStrategy {
    pub fn new() -> Self {
        Self {
            rng: Pcg64::seed_from_u64(123),
        }
    }
}

impl PlacementStrategy for RandomPlacementStrategy {
    fn place_stage(
        &mut self,
        _dag_id: usize,
        stage: &Stage,
        _graph: &Dag,
        input_data: &[DataItem],
        _input_data_shuffled: &[Vec<DataItem>],
        dfs: &DistributedFileSystem,
        compute_host_info: &BTreeMap<Id, ComputeHost>,
        _network: &Network,
    ) -> Vec<TaskPlacement> {
        let mut all_inputs = collect_all_input(input_data, dfs)
            .into_iter()
            .map(|x| x.0)
            .collect::<Vec<_>>();

        shuffle(&mut self.rng, &mut all_inputs);

        let hosts = compute_host_info.keys().copied().collect::<Vec<_>>();

        (0..stage.tasks().len())
            .map(|task_id| TaskPlacement {
                host: hosts[self.rng.gen_range(0..hosts.len())],
                input: (task_id..all_inputs.len())
                    .step_by(stage.tasks().len())
                    .map(|i| all_inputs[i])
                    .collect(),
            })
            .collect()
    }
}

impl Default for RandomPlacementStrategy {
    fn default() -> Self {
        Self::new()
    }
}