1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
//! Some useful functions for placement strategies.

use dslab_core::Id;
use dslab_dfs::dfs::DistributedFileSystem;
use rand::Rng;
use rand_pcg::Pcg64;

use crate::data_item::DataItem;

/// Helper function to collect all inputs along with the location of each one.
///
/// * `DataItem::Replicated` is split into individual chunks.
/// * `DataItem::Local` is split into parts with the size of [chunk_size](DistributedFileSystem::chunk_size) except at most one.
/// * `DataItem::Chunk` remains unchanged.
pub fn collect_all_input(input: &[DataItem], dfs: &DistributedFileSystem) -> Vec<(DataItem, Vec<Id>)> {
    let mut all_inputs = Vec::new();
    for data_item in input.iter() {
        match data_item {
            DataItem::Chunk { chunk_id, .. } => {
                all_inputs.push((
                    *data_item,
                    dfs.chunk_location(*chunk_id)
                        .map(|locations| locations.iter().copied().collect::<Vec<Id>>())
                        .unwrap_or_default(),
                ));
            }
            DataItem::Local { mut size, host } => {
                while size > 0 {
                    let size_here = if size >= dfs.chunk_size() * 2 {
                        dfs.chunk_size()
                    } else {
                        size
                    };
                    all_inputs.push((
                        DataItem::Local {
                            size: size_here,
                            host: *host,
                        },
                        vec![*host],
                    ));
                    size -= size_here;
                }
            }
            DataItem::Replicated { data_id, .. } => {
                for &chunk_id in dfs.data_chunks(*data_id).unwrap().iter() {
                    all_inputs.push((
                        DataItem::Chunk {
                            size: dfs.chunk_size(),
                            chunk_id,
                        },
                        dfs.chunk_location(chunk_id)
                            .map(|locations| locations.iter().copied().collect::<Vec<Id>>())
                            .unwrap_or_default(),
                    ));
                }
            }
        }
    }
    all_inputs
}

/// Helper function to randomly shuffle an array.
pub fn shuffle<T>(rng: &mut Pcg64, data: &mut [T]) {
    for i in 1..data.len() {
        data.swap(i, rng.gen_range(0..=i));
    }
}