from numpy import floor
[docs]
def flatten_index(
n1: "int",
n2: "int",
n3: "int",
nx: "int",
ny: "int",
nz: "int",
algo_in: "str" = None,
):
"""Find the global index of a box based on its index in all 3 direction.
At the moment this is simply sorted according to x then y then z but in the future
more evolved indexing could be implemented.
"""
if algo_in is None:
algo = "fortran_ordering"
else:
algo = algo_in
if algo == "fortran_ordering":
n_glob = n1 + n2 * (nx + 2) + n3 * (nx + 2) * (ny + 2)
elif algo == "c_ordering":
n_glob = n3 + n2 * (nz + 2) + n1 * (nz + 2) * (ny + 2)
else:
n_glob = -99
print(algo, "is not implemented, n_glob set to -99 !!!")
return n_glob
[docs]
def unflatten_index(
n_glob: "int",
nx: "int",
ny: "int",
nz: "int",
algo_in: "str" = None,
):
"""Find the multi-index (i, j, k) of a box based on its global flattened index.
At the moment this is simply sorted according to x then y then z but in the future
more evolved indexing could be implemented.
"""
if algo_in is None:
algo = "fortran_ordering"
else:
algo = algo_in
if algo == "fortran_ordering":
n3 = n_glob // ((nx + 2) * (ny + 2))
rest = n_glob - n3 * (nx + 2) * (ny + 2)
n2 = rest // (nx + 2)
n1 = rest - n2 * (nx + 2)
elif algo == "c_ordering":
n1 = n_glob // ((nz + 2) * (ny + 2))
rest = n_glob - n1 * (nz + 2) * (ny + 2)
n2 = rest // (nz + 2)
n3 = rest - n2 * (nz + 2)
else:
n1 = -99
n2 = -99
n3 = -99
print(algo, "is not implemented, n1, n2 and n3 set to -99 !!!")
return n1, n2, n3
[docs]
def initialize_neighbours(
neighbour: "int[:,:]",
nx: "int",
ny: "int",
nz: "int",
):
"""Initialize the list of neighbours of boxes (find the index of the
neighbours and put the in the right line of the neighbouring array)."""
for k in range(nz + 2):
for j in range(ny + 2):
for i in range(nx + 2):
counter = 0
loc_box = flatten_index(i, j, k, nx, ny, nz)
for kk in range(k - 1, k + 2):
k_box = kk % (nz + 2)
for jj in range(j - 1, j + 2):
j_box = jj % (ny + 2)
for ii in range(i - 1, i + 2):
i_box = ii % (nx + 2)
neig_box = flatten_index(i_box, j_box, k_box, nx, ny, nz)
neighbour[loc_box, counter] = neig_box
counter += 1
[docs]
def find_box(
eta1: float,
eta2: float,
eta3: float,
nx: "int",
ny: "int",
nz: "int",
domain_array: "float[:]",
):
"""Return the number of the box in which the point (eta1, eta2, eta3) is located,
or -1 if the point is not on the process.
Parameters
----------
eta1, eta2, eta3 : array
Logical evaluation points.
nx, ny, nz : int
Number of boxes in each dimension, per mpi process.
domain_array : array
Information of the domain on the current mpi process.
"""
# offset if point is on left boundary
if eta1 == domain_array[0]:
eta1 += 1e-8
if eta2 == domain_array[3]:
eta2 += 1e-8
if eta3 == domain_array[6]:
eta3 += 1e-8
# offset if point is on right boundary
if eta1 == domain_array[1]:
eta1 -= 1e-8
if eta2 == domain_array[4]:
eta2 -= 1e-8
if eta3 == domain_array[7]:
eta3 -= 1e-8
# Leave some room before and after the end of the domain for the box coming from neighbouring processes
x_l = domain_array[0] - (domain_array[1] - domain_array[0]) / nx
x_r = domain_array[1] + (domain_array[1] - domain_array[0]) / nx
y_l = domain_array[3] - (domain_array[4] - domain_array[3]) / ny
y_r = domain_array[4] + (domain_array[4] - domain_array[3]) / ny
z_l = domain_array[6] - (domain_array[7] - domain_array[6]) / nz
z_r = domain_array[7] + (domain_array[7] - domain_array[6]) / nz
if eta1 < x_l or eta1 > x_r or eta2 < y_l or eta2 > y_r or eta3 < z_l or eta3 > z_r:
return -1
n1 = int(floor((eta1 - x_l) / (x_r - x_l) * (nx + 2)))
n2 = int(floor((eta2 - y_l) / (y_r - y_l) * (ny + 2)))
n3 = int(floor((eta3 - z_l) / (z_r - z_l) * (nz + 2)))
return flatten_index(n1, n2, n3, nx, ny, nz)
[docs]
def get_next_index(box_nb: "float", next_index: "int[:]", cumul_next_index: "int[:]"):
"""Utilities for sorting"""
int_bnb = int(box_nb)
index = cumul_next_index[int_bnb] + next_index[int_bnb]
next_index[int_bnb] += 1
return index
[docs]
def sort_boxed_particles(
markers: "float[:,:]",
swap_line_1: "float[:]",
swap_line_2: "float[:]",
n_boxes: "int",
next_index: "int[:]",
cumul_next_index: "int[:]",
box_index: "int" = -2,
):
"""Sort the particules taking advantage of the boxes"""
c = 0
for i in range(next_index.shape[0]):
cumul_next_index[i] = c
c += next_index[i]
cumul_next_index[-1] = c
next_index[:] = 0
l = len(swap_line_2)
loc_i = 0
for box in range(n_boxes):
# skip the ones that are already in good position
loc_i += next_index[box]
# loop through the rest of the box
while loc_i < cumul_next_index[box + 1]:
box_i = int(markers[loc_i, l + box_index])
if box_i == box:
# is it in good position? if yes go to the next one
# and say that this position is no longer available
loc_i += 1
next_index[box] += 1
else:
# swap elements until comming back
swap_i = -1
swap_line_1[:] = markers[loc_i, :]
while swap_i != loc_i:
# where should the actual particule go, put it and save data from the particle that should now move
swap_i = get_next_index(swap_line_1[l + box_index], next_index, cumul_next_index)
swap_line_2[:] = markers[swap_i, :]
markers[swap_i, :] = swap_line_1[:]
swap_line_1[:] = swap_line_2[:]
loc_i += 1
[docs]
def assign_box_to_each_particle(
markers: "float[:,:]",
holes: "bool[:]",
nx: "int",
ny: "int",
nz: "int",
domain_array: "float[:]",
box_index: "int" = -2,
):
"""Assign the right box to each particle, written into column -2 or marker array."""
l = markers.shape[1]
for p in range(markers.shape[0]):
if holes[p]:
n_box = (nx + 2) * (ny + 2) * (nz + 2)
else:
a = find_box(
markers[p, 0],
markers[p, 1],
markers[p, 2],
nx,
ny,
nz,
domain_array,
)
if a >= (nx + 2) * (ny + 2) * (nz + 2) or a < 0:
n_box = (nx + 2) * (ny + 2) * (nz + 2)
else:
n_box = a
b = float(n_box)
markers[p, l + box_index] = b
[docs]
def assign_particles_to_boxes(
markers: "float[:,:]",
holes: "bool[:]",
boxes: "int[:,:]",
next_index: "int[:]",
box_index: "int" = -2,
):
"""Write particle indices into box array."""
boxes[:, :] = -1
next_index[:] = 0
l = markers.shape[1]
for p in range(markers.shape[0]):
if holes[p]:
continue
else:
a = int(markers[p, l + box_index])
boxes[a, next_index[a]] = p
next_index[a] += 1