import networkx as nx
import numpy as np
def custom_equivalent_random_network(G, niter=1, connectivity=True, seed=None):
"""
- `G`의 node degree들을 그대로 유지하고, edge만을 변경하여 리턴함.
"""
RandomGenerator = np.random.RandomState(seed=seed)
G = G.copy()
nodes, degrees = zip(*G.degree()) # keys, degree
# degree의 값에 따라서, node를 선정함.
# nx.random_reference 에서는 cdf를 사용했으나, 차이가 없고 더 직관적이므로 pdf를 사용함.
# degree가 클수록 edge가 바뀔 가능성이 크며,
pdf_by_degree = [d/sum(degrees) for d in degrees]
# niter: niter 개의 edge exchange를 시도함.
niter *= int(len(G.edges()))
# ntries: 매 edge exchange 시도 때마다 몇번까지 시도할 것인가,
# 가령, 확률적으로 어떤 경우에는 valid한 edge exchange가 없을 수 있음.
# 1) 서로 다른 a,b,c,d,가 없거나, 2) 어떻게 잘라도 connectivity가 깨지거나.
# 따라서, 어떤 횟수까지만 시도하고, 넘어갈 경우에는 그냥 edge를 교환하지 못해도 그냥 넘어감.
# 이 부분은 기존 nx.random_reference에 있는 값을 그대로 참고함.
ntries = int(len(G)*len(G.edges()))
ntries/= int(len(G)*(len(G)-1)/2)
ntries = int(ntries)
# edge_exchange_count: 성공한 edge exchange의 수를 셈.
edge_exchange_count = 0
for i in range(0, niter):
for j in range(0, ntries):
# degree dist에 따라, 클수록 확률이 높도록 서로 다른 두 노드를 선택함.
# 엄밀히 따지면, 이렇게 2개를 뽑을 때의 확률은 각각 뽑을 때의 확률과 다름.
# 하지만 귀찮으므로 그냥 이렇게 함.
a, b = RandomGenerator.choice(nodes,
size=2,
p=pdf_by_degree,
replace=False)
# new_b_nbr: uniform하게 선정된 a의 neighbor
new_b_nbr = RandomGenerator.choice(list(G[a]), size=1)[0]
# new_a_nbr: uniform하게 선정된 b의 neighbor
new_a_nbr = RandomGenerator.choice(list(G[b]), size=1)[0]
# 따라서, 이 과정에서 문제가 없기 위해서는, a, b, c, d 모두 서로 달라야 함.
# 이미 a!=b, a!=c, b!=d 의 조건은 만족하며,
# a!=d, b!=c, c!=d 인지 체크하는 것이 필요함.
if (a != new_a_nbr) and (b != new_b_nbr) and (new_b_nbr != new_a_nbr):
# 또한, 이미 연결되어 있는 것이 아닐때, edge exchange가 가능함.
if (new_a_nbr not in G[a]) and (new_b_nbr not in G[b]):
# a, b, c, d가 모두 다르고, 이미 연결되어 있는 것이 아니므로, edge exchange가 가능함.
# edge를 교환함:
# (a - new_a_nbr) ==> (a - new_b_nbr),
# (b - new_b_nbr) ==> (b - new_a_nbr)
G.add_edge(a, new_a_nbr)
G.add_edge(b, new_b_nbr)
G.remove_edge(a, new_b_nbr)
G.remove_edge(b, new_a_nbr)
# argument에서 connectivity가 유지되도록 설정한 경우
if connectivity==True and nx.is_connected(G) is True:
# edge를 exchange해도 connectivity가 유지되므로 valid G
edge_exchange_count+=1
break
else:
# connectivity가 유지되지 않으므로, edge exchange에 실패함.
# 따라서 기존의 G로 돌아감.
G.remove_edge(a, new_a_nbr),
G.remove_edge(b, new_b_nbr),
G.add_edge(a, new_b_nbr),
G.add_edge(b, new_a_nbr)
continue
### each exchange range(0, ntri)
### edge exchange range(0, ntries)
# print(f"{niter} exchange try, {edge_exchange_count} edge exchange success")
return G
########################
# small-world graph
# 엄밀히 따지면, small-world는 아니지만 그냥 씁니다.
small_world_G = nx.karate_club_graph()
assert nx.is_connected(small_world_G) == True
########################
# random_network
clustering_coef_lst = []
shrt_pth_l_lst = []
for i in range(0, 10):
# equivalent random network를 만들고,
equiv_random_G = custom_equivalent_random_network(small_world_G, seed=i)
assert nx.is_connected(equiv_random_G) == True
clustering_coef_lst.append(nx.average_clustering(equiv_random_G))
shrt_pth_l_lst.append(nx.average_shortest_path_length(equiv_random_G))
print("--" * 20)
print("== small world")
print(f"average clustering : {nx.average_clustering(small_world_G)}")
print(f"average shrt path l: {nx.average_shortest_path_length(small_world_G)}")
print("--" * 20)
print("== average from random networks")
print(f"avg clustering from random networks: {np.mean(clustering_coef_lst)}")
print(f"avg short path l from random networks: {np.mean(shrt_pth_l_lst)}")
print("--" * 20)
댓글남기기