监视器
本教程作者: fangwei123456
在 :class:spikingjelly.activation_based.monitor
中定义了几个通用的监视器类,用户可以使用这些监视器实现复杂的数据记录功能 。下面以一个简单的网络为例进行介绍。
1.基本使用
所有的监视器的用法类似,以spikingjelly.activation_based.monitor.OutputMonitor
为例进行介绍。
首先我们搭建起一个简单的多步网络。为了避免无脉冲释放,我们将权重全部设置为正值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import torchimport torch.nn as nnfrom spikingjelly.activation_based import monitor, neuron, functional, layernet = nn.Sequential( layer.Linear(8 , 4 ), neuron.IFNode(), layer.Linear(4 , 2 ), neuron.IFNode() ) for param in net.parameters(): param.data.abs_() functional.set_step_mode(net, 'm' )
spikingjelly.activation_based.monitor.OutputMonitor
可以记录网络中任何类型为 instance
的模块的输出。
脉冲神经元层的输出即为脉冲,因此我们可以使用 :class:OutputMonitor <spikingjelly.activation_based.monitor.OutputMonitor>
来构建一个脉冲监视器,记录网络中所有 neuron.IFNode
的输出脉冲:
1 2 3 4 5 6 7 8 spike_seq_monitor = monitor.OutputMonitor(net, neuron.IFNode) T = 4 N = 1 x_seq = torch.rand([T, N, 8 ]) with torch.no_grad(): net(x_seq)
要记录的数据,会根据生成顺序,保存在 .records
的 list
中:
1 2 3 print (f'spike_seq_monitor.records=\n{spike_seq_monitor.records} ' )
输出为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 spike_seq_monitor.records= [tensor([[[0., 0., 0., 0.]], [[1., 1., 1., 1.]], [[0., 0., 0., 0.]], [[1., 1., 1., 1.]]]), tensor([[[0., 0.]], [[1., 0.]], [[0., 1.]], [[1., 0.]]])]
也可以使用索引操作,直接访问被记录的第 i
个数据:
1 2 3 print (f'spike_seq_monitor[0]={spike_seq_monitor[0 ]} ' )
输出为:
1 2 3 4 5 6 7 8 9 spike_seq_monitor[0]=tensor([[[0., 0., 0., 0.]], [[1., 1., 1., 1.]], [[0., 0., 0., 0.]], [[1., 1., 1., 1.]]])
.monitored_layers
记录了被监视器监控的层的名字:
1 2 3 4 print (f'net={net} ' )print (f'spike_seq_monitor.monitored_layers={spike_seq_monitor.monitored_layers} ' )
输出为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 net=Sequential( (0): Linear(in_features=8, out_features=4, bias=True) (1): IFNode( v_threshold=1.0, v_reset=0.0, detach_reset=False, step_mode=m, backend=torch (surrogate_function): Sigmoid(alpha=4.0, spiking=True) ) (2): Linear(in_features=4, out_features=2, bias=True) (3): IFNode( v_threshold=1.0, v_reset=0.0, detach_reset=False, step_mode=m, backend=torch (surrogate_function): Sigmoid(alpha=4.0, spiking=True) ) ) spike_seq_monitor.monitored_layers=['1', '3']
可以直接通过层的名字作为索引,访问某一层被记录的数据。这返回的是一个 list
:
1 2 3 print (f"spike_seq_monitor['1']={spike_seq_monitor['1' ]} " )
输出为:
1 2 3 4 5 6 7 8 9 spike_seq_monitor['1']=[tensor([[[0., 0., 0., 0.]], [[1., 1., 1., 1.]], [[0., 0., 0., 0.]], [[1., 1., 1., 1.]]])]
可以通过调用 .clear_recorded_data()
来清空已经记录的数据:
1 2 3 4 5 spike_seq_monitor.clear_recorded_data() print (f'spike_seq_monitor.records={spike_seq_monitor.records} ' )print (f"spike_seq_monitor['1']={spike_seq_monitor['1' ]} " )
输出为:
1 2 3 4 spike_seq_monitor.records=[] spike_seq_monitor['1']=[]
所有的 monitor
在析构时都会自动删除已经注册的钩子,但python的内存回收机制并不保证在手动调用 del
时一定会进行析构。因此删除一个监视器,并不能保证钩子也立刻被删除:
1 2 3 4 del spike_seq_monitor
若想立刻删除钩子,应该通过以下方式:
1 2 3 spike_seq_monitor.remove_hooks()
OutputMonitor <spikingjelly.activation_based.monitor.OutputMonitor>
还支持在记录数据时就对数据进行简单的处理,只需要指定构造函数中的 function_on_output
即可。 function_on_output
的默认值是 lambda x: x
,也就是默认不进行任何处理。
我们想要记录每个时刻的脉冲发放频率,首先要定义脉冲发放频率如何计算:
1 2 3 4 5 def cal_firing_rate (s_seq: torch.Tensor ): return s_seq.flatten(1 ).mean(1 )
接下来就可以以此来构建发放率监视器:
1 2 3 fr_monitor = monitor.OutputMonitor(net, neuron.IFNode, cal_firing_rate)
通过 .disable()
可以让 monitor
暂停记录,而 .enable()
则可以让其重新开始记录:
1 2 3 4 5 6 7 8 9 10 11 12 13 with torch.no_grad(): functional.reset_net(net) fr_monitor.disable() net(x_seq) functional.reset_net(net) print (f'after call fr_monitor.disable(), fr_monitor.records=\n{fr_monitor.records} ' ) fr_monitor.enable() net(x_seq) print (f'after call fr_monitor.enable(), fr_monitor.records=\n{fr_monitor.records} ' ) functional.reset_net(net) del fr_monitor
输出为:
1 2 3 4 5 6 after call fr_monitor.disable(), fr_monitor.records= [] after call fr_monitor.enable(), fr_monitor.records= [tensor([0.0000, 1.0000, 0.5000, 1.0000]), tensor([0., 1., 0., 1.])]
3.记录模块成员变量
若想记录模块的成员变量,例如神经元的电压,可以通过 :class:spikingjelly.activation_based.monitor.AttributeMonitor
实现。
神经元构造参数中的 store_v_seq: bool = False
表示在默认情况下,只记录当前时刻的电压,不记录所有时刻的电压序列。现在我们想记录所有时刻的电压,则将其更改为 True
:
1 2 3 4 5 for m in net.modules(): if isinstance (m, neuron.IFNode): m.store_v_seq = True
接下来,新建记录电压序列的监视器并进行记录:
1 2 3 4 5 6 7 8 v_seq_monitor = monitor.AttributeMonitor('v_seq' , pre_forward=False , net=net, instance=neuron.IFNode) with torch.no_grad(): net(x_seq) print (f'v_seq_monitor.records=\n{v_seq_monitor.records} ' ) functional.reset_net(net) del v_seq_monitor
输出为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 v_seq_monitor.records= [tensor([[[0.8102, 0.8677, 0.8153, 0.9200]], [[0.0000, 0.0000, 0.0000, 0.0000]], [[0.0000, 0.8129, 0.0000, 0.9263]], [[0.0000, 0.0000, 0.0000, 0.0000]]]), tensor([[[0.2480, 0.4848]], [[0.0000, 0.0000]], [[0.8546, 0.6674]], [[0.0000, 0.0000]]])]
4.记录模块输入
设置输入监视器的方法,和设置输出监视器的如出一辙:
1 2 3 4 5 6 7 8 input_monitor = monitor.InputMonitor(net, neuron.IFNode) with torch.no_grad(): net(x_seq) print (f'input_monitor.records=\n{input_monitor.records} ' ) functional.reset_net(net) del input_monitor
输出为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 input_monitor.records= [tensor([[[1.1710, 0.7936, 0.9325, 0.8227]], [[1.4373, 0.7645, 1.2167, 1.3342]], [[1.6011, 0.9850, 1.2648, 1.2650]], [[0.9322, 0.6143, 0.7481, 0.9770]]]), tensor([[[0.8072, 0.7733]], [[1.1186, 1.2176]], [[1.0576, 1.0153]], [[0.4966, 0.6030]]])]
5.记录模块的输入梯度: ∂ L ∂ Y \frac{\partial L}{\partial Y} ∂ Y ∂ L
如果我们想要记录每一层脉冲神经元的输入梯度 :∂ L ∂ S \frac{\partial L}{\partial S} ∂ S ∂ L ,则可以使用spikingjelly.activation_based.monitor.GradOutputMonitor
轻松实现:
1 2 3 4 5 6 spike_seq_grad_monitor = monitor.GradOutputMonitor(net, neuron.IFNode) net(x_seq).sum ().backward() print (f'spike_seq_grad_monitor.records=\n{spike_seq_grad_monitor.records} ' )functional.reset_net(net) del spike_seq_grad_monitor
输出为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 spike_seq_grad_monitor.records= [tensor([[[1. , 1. ]], [[1. , 1. ]], [[1. , 1. ]], [[1. , 1. ]]]), tensor([[[ 0.0803 , 0.0383 , 0.1035 , 0.1177 ]], [[-0.1013 , -0.1346 , -0.0561 , -0.0085 ]], [[ 0.5364 , 0.6285 , 0.3696 , 0.1818 ]], [[ 0.3704 , 0.4747 , 0.2201 , 0.0596 ]]])]
由于我们使用 .sum().backward()
,因而损失传给最后一层输出脉冲的梯度全为1。
6. 记录模块的输出梯度 :∂ L ∂ X \frac{\partial L}{\partial X} ∂ X ∂ L
使用spikingjelly.activation_based.monitor.GradInputMonitor
可以轻松记录模块的输出梯度∂ L ∂ X \frac{\partial L}{\partial X} ∂ X ∂ L 。
让我们构建一个深度网络,调节替代函数的 alpha
并比较不同 alpha
下的梯度的幅值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import torchimport torch.nn as nnfrom spikingjelly.activation_based import monitor, neuron, functional, layer, surrogatenet = [] for i in range (10 ): net.append(layer.Linear(8 , 8 )) net.append(neuron.IFNode()) net = nn.Sequential(*net) functional.set_step_mode(net, 'm' ) T = 4 N = 1 x_seq = torch.rand([T, N, 8 ]) input_grad_monitor = monitor.GradInputMonitor(net, neuron.IFNode, function_on_grad_input=torch.norm) for alpha in [0.1 , 0.5 , 2 , 4 , 8 ]: for m in net.modules(): if isinstance (m, surrogate.Sigmoid): m.alpha = alpha net(x_seq).sum ().backward() print (f'alpha={alpha} , input_grad_monitor.records=\n{input_grad_monitor.records} \n' ) functional.reset_net(net) for param in net.parameters(): param.grad.zero_() input_grad_monitor.records.clear()
输出为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 alpha=0.1, input_grad_monitor.records= [tensor(0.3868), tensor(0.0138), tensor(0.0003), tensor(9.1888e-06), tensor(1.0164e-07), tensor(1.9384e-09), tensor(4.0199e-11), tensor(8.6942e-13), tensor(1.3389e-14), tensor(2.7714e-16)] alpha=0.5, input_grad_monitor.records= [tensor(1.7575), tensor(0.2979), tensor(0.0344), tensor(0.0045), tensor(0.0002), tensor(1.5708e-05), tensor(1.6167e-06), tensor(1.6107e-07), tensor(1.1618e-08), tensor(1.1097e-09)] alpha=2, input_grad_monitor.records= [tensor(3.3033), tensor(1.2917), tensor(0.4673), tensor(0.1134), tensor(0.0238), tensor(0.0040), tensor(0.0008), tensor(0.0001), tensor(2.5466e-05), tensor(3.9537e-06)] alpha=4, input_grad_monitor.records= [tensor(3.5353), tensor(1.6377), tensor(0.7076), tensor(0.2143), tensor(0.0369), tensor(0.0069), tensor(0.0026), tensor(0.0006), tensor(0.0003), tensor(8.5736e-05)] alpha=8, input_grad_monitor.records= [tensor(4.3944), tensor(2.4396), tensor(0.8996), tensor(0.4376), tensor(0.0640), tensor(0.0122), tensor(0.0053), tensor(0.0016), tensor(0.0013), tensor(0.0005)]
7. 降低内存占用
如果我们需要记录大量数据,当被记录的数据是脉冲时,可以通过一些方法来降低内存占用。
为了能够进行浮点计算,尽管脉冲只含有0/1,但它们仍然被存储为浮点形式。因此,脉冲tensor的数据类型仍然为float32,或float16(如果使用混合精度训练)。
将float32转换为bool类型,可以降低内存占用。但由于C++中的bool类型实际上仍然是8比特,这种方式只能把内存降低为原来的1/4:
1 2 3 4 5 6 7 8 9 10 11 12 import torchdef tensor_memory (x: torch.Tensor ): return x.element_size() * x.numel() N = 1 << 10 spike = torch.randint(0 , 2 , [N]).float () print ('float32 size =' , tensor_memory(spike))print ('torch.bool size =' , tensor_memory(spike.to(torch.bool )))
输出为:
1 2 3 4 float32 size = 4096 torch.bool size = 1024
在spikingjelly.activation_based.tensor_cache
中提供了将float32/float16类型的脉冲tensor压缩到uint8类型脉冲tensor的函数,其中uint8的tensor,每个元素使用8比特,保存8个脉冲,相当于是“真正的bool”类型。示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import torchdef tensor_memory (x: torch.Tensor ): return x.element_size() * x.numel() N = 1 << 10 spike = torch.randint(0 , 2 , [N]).float () print ('float32 size =' , tensor_memory(spike))print ('torch.bool size =' , tensor_memory(spike.to(torch.bool )))from spikingjelly.activation_based import tensor_cachespike_b, s_dtype, s_shape, s_padding = tensor_cache.float_spike_to_bool(spike) print ('bool size =' , tensor_memory(spike_b))spike_recover = tensor_cache.bool_spike_to_float(spike_b, s_dtype, s_shape, s_padding) print ('spike == spike_recover?' , torch.equal(spike, spike_recover))
输出为:
1 2 3 4 5 6 float32 size = 4096 torch.bool size = 1024 bool size = 128 spike == spike_recover? True
与监视器结合使用,只需要将压缩函数增加到监视器的自定义函数中:
1 2 3 spike_seq_monitor = monitor.OutputMonitor(net, neuron.IFNode, function_on_output=tensor_cache.float_spike_to_bool)
在访问记录的数据时,再临时解压缩即可:
1 2 3 4 for item in spike_seq_monitor.records: print (tensor_cache.bool_spike_to_float(*item))
此外,对于稀疏的脉冲,还可以考虑使用 zlib
等库进行进一步的压缩。下面是对发放率为0.2的脉冲进行进一步压缩的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import torchimport zlibfrom spikingjelly.activation_based import tensor_cachedef tensor_memory (x: torch.Tensor ): return x.element_size() * x.numel() N = 1 << 20 spike = (torch.rand([N]) > 0.8 ).float () spike_b, s_dtype, s_shape, s_padding = tensor_cache.float_spike_to_bool(spike) arr = spike_b.numpy() compressed_arr = zlib.compress(arr.tobytes()) print ("compressed ratio:" , len (compressed_arr) / arr.nbytes * tensor_memory(spike_b) / tensor_memory(spike))
输出为:
1 2 3 compressed ratio: 0.024264097213745117
如果想和监视器结合使用,仍然是放进自定义函数即可。完整的示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import torchimport torch.nn as nnimport zlibimport numpy as npfrom spikingjelly.activation_based import monitor, neuron, functional, layer, tensor_cachedef compress (spike: torch.Tensor ): spike_b, s_dtype, s_shape, s_padding = tensor_cache.float_spike_to_bool(spike) spike_cb = zlib.compress(spike_b.cpu().numpy().tobytes()) return spike_cb, s_dtype, s_shape, s_padding def decompress (spike_cb, s_dtype, s_shape, s_padding ): spike_b = torch.frombuffer(zlib.decompress(spike_cb), dtype=torch.uint8) return tensor_cache.bool_spike_to_float(spike_b, s_dtype, s_shape, s_padding) net = nn.Sequential( layer.Linear(8 , 4 ), neuron.IFNode(), layer.Linear(4 , 2 ), neuron.IFNode() ) for param in net.parameters(): param.data.abs_() functional.set_step_mode(net, 'm' ) spike_seq_monitor = monitor.OutputMonitor(net, neuron.IFNode, function_on_output=compress) T = 4 N = 1 x_seq = torch.rand([T, N, 8 ]) with torch.no_grad(): net(x_seq) for item in spike_seq_monitor.records: print (decompress(*item))
需要注意的是,zlib
的压缩只能在CPU上进行,如果原始数据在GPU上,则两边传输数据会大幅度拖慢运行速度。