Commit 5192ebbc authored by Mohammadreza's avatar Mohammadreza

im: add feature

parent 2f4ac40d
......@@ -12,6 +12,10 @@ def design_filters(s_rate):
filters['notch2'] = [output[0], output[1]] # b, a
output = signal.butter(N=5, Wn=np.array([0.53, 32]), btype='bandpass', analog=False, output='ba', fs=s_rate)
filters['bandpass'] = [output[0], output[1]] # b, a
output = signal.butter(N=5, Wn=np.array(1), btype='highpass', analog=False, output='ba', fs=128)
filters['highpass_resample'] = [output[0], output[1]] # b, a
output = signal.butter(N=5, Wn=np.array(40), btype='lowpass', analog=False, output='ba', fs=128)
filters['lowpass_resample'] = [output[0], output[1]] # b, a
return filters
......
import numpy as np
import dill
import pickle
from scipy import signal, stats
from server.sl8 import FileBase
from extra import design_filters
file = 'C:/Users/Mohammadreza/Desktop/1627280018_EyeOpen_1_Other_2.iec'
iec = FileBase()
iec.load(file)
iec_data = iec.get_brain_raw_data()
sr = iec.get_brain_sample_rate()
sc = iec.get_brain_sample_count()
gain = iec.get_gain()
filters = design_filters(sr)
iec_data = np.array(iec_data)
blink_remove = True
resample_rate = 128
age = '35.0'
with open('D:/Infinite8/neuro-app/assets/data/normal_open.sarmad', 'rb') as in_strm:
sarmad_file = dill.load(in_strm)[age]
with open('D:/Infinite8/neuro-app/assets/data/svclassifier_new.pkl', 'rb') as fid:
blink_svm = pickle.load(fid)
class OfflineQeegThread:
def __init__(self, data=None, sps=None, gain=None):
self.data = data
self.sps = sps
self.gain = gain
def run(self):
try:
self.preprocess()
except Exception as e:
print('Error', e)
def preprocess(self):
signals = np.array(self.data)
signals = signals[0: 21, :]
signals = signals / self.gain
signals = signal.detrend(signals, axis=1)
signals = signals[:, 2 * self.sps:signals.shape[1] - (2 * self.sps)]
b, a = filters['notch1'][0], filters['notch1'][1]
signals = signal.filtfilt(b, a, signals)
b, a = filters['notch2'][0], filters['notch2'][1]
signals = signal.filtfilt(b, a, signals)
b, a = filters['bandpass'][0], filters['bandpass'][1]
signals = signal.filtfilt(b, a, signals)
if blink_remove:
blink_index = self.artifact_rejection(signals[0, :])
if blink_index:
for item in blink_index:
signals[:, item[0]: item[1]] = np.nan
signals = signals[:, ~np.all(np.isnan(signals), axis=0)]
self.resample(signals)
def artifact_rejection(self, data):
blink_index_list = []
for j in range(0, data.shape[0] - 100, 40):
new_data = data[j: j + 100]
features = np.array(self.feature_extraction(new_data))
temp = blink_svm['scaler'].transform(features.reshape(1, features.shape[0]))
state = blink_svm['model'].predict(temp)
if state:
blink_index_list.append([j, j + 100])
print('Blink', j, j + 100)
return blink_index_list
@staticmethod
def feature_extraction(data_):
var = np.var(data_)
std = np.sqrt(var)
rms = np.sqrt(np.mean(data_ ** 2))
skewness = stats.skew(data_)
kurtosis = stats.kurtosis(data_, fisher=False)
ptop = np.max(data_) - np.min(data_)
auc = int(np.trapz(data_, dx=1))
zero_crossings = np.where(np.diff(np.sign(data_)))[0]
return [var, std, rms, skewness, kurtosis, ptop, auc, len(zero_crossings)]
def resample(self, data):
signals = data.transpose()
new_rate = resample_rate
number_of_samples = round(signals.shape[0] * float(new_rate) / self.sps)
resampled = np.zeros((number_of_samples, signals.shape[1]))
for ch in range(signals.shape[1]):
resampled[:, ch] = signal.resample(signals[:, ch], number_of_samples)
self.remontage(resampled, 'LE')
def remontage(self, data, montage='LE'):
signals = data.T
N = signals.shape[0]
I = np.eye(N)
v_final = None
if montage == 'AR':
r_ave = np.ones((N, N)) * (1 / N)
x_ = (I - r_ave)
y_ = signals
v_final = np.dot(x_, y_)
elif montage == 'LE':
r_le = np.zeros((N, N))
r_le[:, [-2, -1]] = 0.5
x_ = (I - r_le)
y_ = signals
v_final = np.dot(x_, y_)
self.filter(v_final[0:19, :])
def filter(self, data):
b, a = filters['highpass_resample'][0], filters['highpass_resample'][1]
eeg_filtered = signal.filtfilt(b, a, data)
b2, a2 = filters['lowpass_resample'][0], filters['lowpass_resample'][1]
eeg_filtered_2 = signal.filtfilt(b2, a2, eeg_filtered)
self.power(eeg_filtered_2)
def power(self, data):
f, pxx = signal.welch(data, fs=resample_rate,
window=signal.windows.tukey(2 * resample_rate, sym=False, alpha=.17),
nperseg=2 * resample_rate, noverlap=int(resample_rate * 0.75),
nfft=2 * resample_rate, return_onesided=True,
scaling='spectrum', axis=-1, average='mean')
self.calc_band_power(pxx[:, 2:82] * 0.84)
def calc_band_power(self, data):
power_abs_delta = np.sum(data[:, 0:6], axis=1)
power_abs_theta = np.sum(data[:, 6:14], axis=1)
power_abs_alpha = np.sum(data[:, 14:22], axis=1)
power_abs_beta = np.sum(data[:, 22:48], axis=1)
power_abs_high_beta = np.sum(data[:, 48:58], axis=1)
power_abs_gamma = np.sum(data[:, 58:78], axis=1)
power_abs_alpha1 = np.sum(data[:, 14:18], axis=1)
power_abs_alpha2 = np.sum(data[:, 18:22], axis=1)
power_abs_beta1 = np.sum(data[:, 22:28], axis=1)
power_abs_beta2 = np.sum(data[:, 28:34], axis=1)
power_abs_beta3 = np.sum(data[:, 34:48], axis=1)
power_abs_gamma1 = np.sum(data[:, 58:68], axis=1)
power_abs_gamma2 = np.sum(data[:, 68:78], axis=1)
power_all = np.sum(data[:, 0:78], axis=1)
power_rel_delta = power_abs_delta / power_all * 100
power_rel_theta = power_abs_theta / power_all * 100
power_rel_alpha = power_abs_alpha / power_all * 100
power_rel_beta = power_abs_beta / power_all * 100
power_rel_high_beta = power_abs_high_beta / power_all * 100
power_rel_gamma = power_abs_gamma / power_all * 100
power_rel_alpha1 = power_abs_alpha1 / power_all * 100
power_rel_alpha2 = power_abs_alpha2 / power_all * 100
power_rel_beta1 = power_abs_beta1 / power_all * 100
power_rel_beta2 = power_abs_beta2 / power_all * 100
power_rel_beta3 = power_abs_beta3 / power_all * 100
power_rel_gamma1 = power_abs_gamma1 / power_all * 100
power_rel_gamma2 = power_abs_gamma2 / power_all * 100
power_rat_dt = power_abs_delta / power_abs_theta
power_rat_da = power_abs_delta / power_abs_alpha
power_rat_db = power_abs_delta / power_abs_beta
power_rat_dhb = power_abs_delta / power_abs_high_beta
power_rat_ta = power_abs_theta / power_abs_alpha
power_rat_tb = power_abs_theta / power_abs_beta
power_rat_thb = power_abs_theta / power_abs_high_beta
power_rat_ab = power_abs_alpha / power_abs_beta
power_rat_ahb = power_abs_alpha / power_abs_high_beta
power_rat_bhb = power_abs_beta / power_abs_high_beta
#############################################################
u = sarmad_file['abs']['u']
s = sarmad_file['abs']['s']
power_abs_delta_z = (np.log10(power_abs_delta + 0.35) - u[:, 0]) / s[:, 0]
power_abs_theta_z = (np.log10(power_abs_theta + 0.35) - u[:, 1]) / s[:, 1]
power_abs_alpha_z = (np.log10(power_abs_alpha + 0.35) - u[:, 2]) / s[:, 2]
power_abs_beta_z = (np.log10(power_abs_beta + 0.35) - u[:, 3]) / s[:, 3]
power_abs_high_beta_z = (np.log10(power_abs_high_beta + 0.35) - u[:, 4]) / s[:, 4]
power_abs_alpha1_z = (np.log10(power_abs_alpha1 + 0.35) - u[:, 5]) / s[:, 5]
power_abs_alpha2_z = (np.log10(power_abs_alpha2 + 0.35) - u[:, 6]) / s[:, 6]
power_abs_beta1_z = (np.log10(power_abs_beta1 + 0.35) - u[:, 7]) / s[:, 7]
power_abs_beta2_z = (np.log10(power_abs_beta2 + 0.35) - u[:, 8]) / s[:, 8]
power_abs_beta3_z = (np.log10(power_abs_beta3 + 0.35) - u[:, 9]) / s[:, 9]
u = sarmad_file['rel']['u']
s = sarmad_file['rel']['s']
power_rel_delta_z = (np.log10(power_rel_delta + 0.35) - u[:, 0]) / s[:, 0]
power_rel_theta_z = (np.log10(power_rel_theta + 0.35) - u[:, 1]) / s[:, 1]
power_rel_alpha_z = (np.log10(power_rel_alpha + 0.35) - u[:, 2]) / s[:, 2]
power_rel_beta_z = (np.log10(power_rel_beta + 0.35) - u[:, 3]) / s[:, 3]
power_rel_high_beta_z = (np.log10(power_rel_high_beta + 0.35) - u[:, 4]) / s[:, 4]
power_rel_alpha1_z = (np.log10(power_rel_alpha1 + 0.35) - u[:, 5]) / s[:, 5]
power_rel_alpha2_z = (np.log10(power_rel_alpha2 + 0.35) - u[:, 6]) / s[:, 6]
power_rel_beta1_z = (np.log10(power_rel_beta1 + 0.35) - u[:, 7]) / s[:, 7]
power_rel_beta2_z = (np.log10(power_rel_beta2 + 0.35) - u[:, 8]) / s[:, 8]
power_rel_beta3_z = (np.log10(power_rel_beta3 + 0.35) - u[:, 9]) / s[:, 9]
u = sarmad_file['rat']['u']
s = sarmad_file['rat']['s']
power_rat_dt_z = (np.log10(power_rat_dt + 0.7) - u[:, 0]) / s[:, 0]
power_rat_da_z = (np.log10(power_rat_da + 0.7) - u[:, 1]) / s[:, 1]
power_rat_db_z = (np.log10(power_rat_db + 0.7) - u[:, 2]) / s[:, 2]
power_rat_dhb_z = (np.log10(power_rat_dhb + 0.7) - u[:, 3]) / s[:, 3]
power_rat_ta_z = (np.log10(power_rat_ta + 0.7) - u[:, 4]) / s[:, 4]
power_rat_tb_z = (np.log10(power_rat_tb + 0.7) - u[:, 5]) / s[:, 5]
power_rat_thb_z = (np.log10(power_rat_thb + 0.7) - u[:, 6]) / s[:, 6]
power_rat_ab_z = (np.log10(power_rat_ab + 0.7) - u[:, 7]) / s[:, 7]
power_rat_ahb_z = (np.log10(power_rat_ahb + 0.7) - u[:, 8]) / s[:, 8]
power_rat_bhb_z = (np.log10(power_rat_bhb + 0.7) - u[:, 9]) / s[:, 9]
power_dict = {'absolute power': {'delta': power_abs_delta, 'theta': power_abs_theta, 'alpha': power_abs_alpha,
'beta': power_abs_beta, 'high_beta': power_abs_high_beta,
'gamma': power_abs_gamma, 'alpha1': power_abs_alpha1,
'alpha2': power_abs_alpha2, 'beta1': power_abs_beta1,
'beta2': power_abs_beta2, 'beta3': power_abs_beta3,
'gamma1': power_abs_gamma1, 'gamma2': power_abs_gamma2},
'relative power': {'delta': power_rel_delta, 'theta': power_rel_theta, 'alpha': power_rel_alpha,
'beta': power_rel_beta, 'high_beta': power_rel_high_beta,
'gamma': power_rel_gamma,
'alpha1': power_rel_alpha1, 'alpha2': power_rel_alpha2,
'beta1': power_rel_beta1, 'beta2': power_rel_beta2, 'beta3': power_rel_beta3,
'gamma1': power_rel_gamma1, 'gamma2': power_rel_gamma2},
'power ratio': {'delta/theta': power_rat_dt, 'delta/alpha': power_rat_da,
'delta/beta': power_rat_db, 'delta/high_beta': power_rat_dhb,
'theta/alpha': power_rat_ta, 'theta/beta': power_rat_tb,
'theta/high_beta': power_rat_thb, 'alpha/beta': power_rat_ab,
'alpha/high_beta': power_rat_ahb, 'beta/high_beta': power_rat_bhb},
'z scored absolute power': {'delta': power_abs_delta_z, 'theta': power_abs_theta_z,
'alpha': power_abs_alpha_z,
'beta': power_abs_beta_z, 'high_beta': power_abs_high_beta_z,
'alpha1': power_abs_alpha1_z, 'alpha2': power_abs_alpha2_z,
'beta1': power_abs_beta1_z, 'beta2': power_abs_beta2_z,
'beta3': power_abs_beta3_z},
'z scored relative power': {'delta': power_rel_delta_z, 'theta': power_rel_theta_z,
'alpha': power_rel_alpha_z,
'beta': power_rel_beta_z, 'high_beta': power_rel_high_beta_z,
'alpha1': power_rel_alpha1_z, 'alpha2': power_rel_alpha2_z,
'beta1': power_rel_beta1_z, 'beta2': power_rel_beta2_z,
'beta3': power_rel_beta3_z},
'z scored power ratio': {'delta/theta': power_rat_dt_z, 'delta/alpha': power_rat_da_z,
'delta/beta': power_rat_db_z, 'delta/high_beta': power_rat_dhb_z,
'theta/alpha': power_rat_ta_z, 'theta/beta': power_rat_tb_z,
'theta/high_beta': power_rat_thb_z, 'alpha/beta': power_rat_ab_z,
'alpha/high_beta': power_rat_ahb_z, 'beta/high_beta': power_rat_bhb_z}}
print(power_dict)
oqt = OfflineQeegThread(data=iec_data, gain=gain, sps=sr)
oqt.run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment