Commit a5f1170f authored by Mohammadreza's avatar Mohammadreza

im: add project files

parents
Pipeline #198 canceled with stages
__pycache__
.idea
/assets/
build
dist
*.iec
*.txt
*.h5
*.pkl
/venv/
/iec/
/results/
import numpy as np
import pickle
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from lazypredict.Supervised import LazyClassifier
from scipy import signal, stats
import seaborn as sns
import matplotlib.pyplot as plt
from server.sl8 import FileBase
from extra import design_filters
def feature_extraction(data_):
# data_ = data_ + abs(np.min(data_))
# ave = np.mean(data_)
var = np.var(data_)
std = np.sqrt(var)
rms = np.sqrt(np.mean(data_ ** 2))
skewness = stats.skew(data_)
kurtosis = stats.kurtosis(data_, fisher=False)
ptop = np.max(data_) - np.min(data_)
auc = int(np.trapz(data_, dx=1))
zero_crossings = np.where(np.diff(np.sign(data_)))[0]
# f, pxx = signal.welch(data_, fs=sr,
# window=signal.windows.tukey(len(data_), sym=False, alpha=.17),
# nperseg=len(data_), noverlap=int(len(data_) * 0.75),
# nfft=2 * sr, return_onesided=True,
# scaling='spectrum', axis=-1, average='mean')
# psd = np.sum(np.log10(pxx[2:6]))
# print(ave, var, std, rms, skewness, kurtosis, ptop, auc, len(zero_crossings), psd)
return [var, std, rms, skewness, kurtosis, ptop, auc, len(zero_crossings)]
# return [var, std, rms, skewness, kurtosis, ptop, psd]
# filename = './iec_files/1615632656_EyeOpen_1_Other.iec'
# filename = './iec_files/1618321042_EyeOpen_6_sina_izani.iec'
# filename = './iec_files/1618747412_EyeOpen_7_melika_hoseinzadeh.iec'
# filename = './iec_files/1619683416_EyeOpen_9_khosravi.iec'
# filename = './iec_files/1620309313_EyeOpen_12_mr_mohsen_shahbaz_beigi.iec'
# filename = './iec_files/1620576311_EyeOpen_14_mahan_endallah.iec'
# filename = './iec_files/1621949668_EyeOpen_16_diyana_ebrahimi_0.iec'
# filename = './iec_files/1623149547_EyeOpen_1_Other_3.iec'
# filename = './iec_files/1623154705_EyeOpen_1_Other_3.iec'
# filename = './iec_files/1623597049_EyeOpen_18_anita_mirzayi_0.iec'
# filename = './iec_files/1624281526_EyeOpen_1_Other_0.iec'
# filename = './iec_files/1624281967_EyeOpen_1_Other_0.iec'
# file_list = [filename]
file_list = ['./iec_files/1615632656_EyeOpen_1_Other.iec',
'./iec_files/1618321042_EyeOpen_6_sina_izani.iec',
'./iec_files/1618747412_EyeOpen_7_melika_hoseinzadeh.iec',
'./iec_files/1619683416_EyeOpen_9_khosravi.iec',
'./iec_files/1620309313_EyeOpen_12_mr_mohsen_shahbaz_beigi.iec',
'./iec_files/1620576311_EyeOpen_14_mahan_endallah.iec',
'./iec_files/1621949668_EyeOpen_16_diyana_ebrahimi_0.iec',
'./iec_files/1623149547_EyeOpen_1_Other_3.iec',
'./iec_files/1623154705_EyeOpen_1_Other_3.iec',
'./iec_files/1623597049_EyeOpen_18_anita_mirzayi_0.iec',
'./iec_files/1624281526_EyeOpen_1_Other_0.iec',
'./iec_files/1624281967_EyeOpen_1_Other_0.iec'
]
feature_list = []
for filename in file_list:
labeled_filename = filename.replace('iec_files', 'label_results').replace('iec', 'txt')
iec = FileBase()
iec.load(filename)
data = iec.get_brain_raw_data()
sr = iec.get_brain_sample_rate()
gain = iec.get_gain()
data = data / gain
data = signal.detrend(data, axis=1)
filters = design_filters(sr)
data = signal.filtfilt(filters['notch1'][0], filters['notch1'][1], data)
data = signal.filtfilt(filters['notch2'][0], filters['notch2'][1], data)
data = signal.filtfilt(filters['bandpass'][0], filters['bandpass'][1], data)
# with open(labeled_filename) as f:
# content = f.readlines()
# # you may also want to remove whitespace characters like `\n` at the end of each line
# content = [x.strip()[1:-1] for x in content]
#
# arr = np.zeros((len(content), 4), dtype=int)
# for i in range(len(content)):
# temp = content[i].split(',')
# arr[i, 0] = int(temp[0])
# arr[i, 1] = int(temp[1])
# arr[i, 2] = int(temp[2])
# arr[i, 3] = int(temp[3])
# del content
arr = np.loadtxt(labeled_filename, dtype=int)
features = []
for i in range(arr.shape[0]):
# print(arr[i, 0], arr[i, 1], arr[i, 2])
my_data = data[arr[i, 0], arr[i, 1]: arr[i, 2]]
# my_data = my_data / gain
# my_data -= np.mean(my_data)
# my_data = signal.detrend(my_data)
# my_data = signal.filtfilt(filters['notch1'][0], filters['notch1'][1], my_data)
# my_data = signal.filtfilt(filters['notch2'][0], filters['notch2'][1], my_data)
# my_data = signal.filtfilt(filters['bandpass'][0], filters['bandpass'][1], my_data)
# my_data = (my_data - np.min(my_data)) / (np.max(my_data) - np.min(my_data))
# my_data = my_data / np.max(my_data)
# my_data = (my_data - np.min(my_data)) / np.ptp(my_data)
feats = feature_extraction(my_data)
feats.append(arr[i, 3])
features.append(feats)
features = np.array(features)
features = features[features[:, -1].argsort()]
a = features[features[:, -1] == 0]
b = features[features[:, -1] == 1]
c = a[np.random.randint(a.shape[0], size=b.shape[0]), :]
features = np.vstack((c, b))
feature_list.append(features)
X_all = feature_list[0]
for i in range(0, len(feature_list) - 1):
X_all = np.concatenate((X_all, feature_list[i + 1]), axis=0)
X = X_all[:, :-1]
y = X_all[:, -1]
a = y[y == 0]
b = y[y == 1]
print(len(a), len(b))
scaler = StandardScaler()
scaled_data = scaler.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(scaled_data, y, test_size=0.30)
svclassifier = SVC(kernel='rbf', class_weight={1: len(a) / len(b)}, probability=True)
svclassifier.fit(X_train, y_train)
y_pred = svclassifier.predict(X_test)
cm = confusion_matrix(y_test, y_pred)
acc = accuracy_score(y_test, y_pred)
print('Accuracy:', acc)
print(cm)
print(classification_report(y_test, y_pred))
plt.figure()
ax = plt.subplot()
plt.suptitle('Accuracy: ' + str(acc))
sns.heatmap(cm, annot=True, ax=ax, fmt='g', cmap='Greens')
plt.show()
# # save the classifier
# model = {'model': svclassifier, 'scaler': scaler}
# with open('svclassifier_new3.pkl', 'wb') as fid:
# pickle.dump(model, fid)
# rfc = RandomForestClassifier(max_depth=10, random_state=0)
# rfc.fit(X_train, y_train)
#
# y_pred = rfc.predict(X_test)
#
# cm = confusion_matrix(y_test, y_pred)
# acc = accuracy_score(y_test, y_pred)
# print('Accuracy:', acc)
# print(cm)
# print(classification_report(y_test, y_pred))
# plt.figure()
# ax = plt.subplot()
# plt.suptitle('Accuracy: ' + str(acc))
# sns.heatmap(cm, annot=True, ax=ax, fmt='g', cmap='Greens')
# plt.show()
# clf = LazyClassifier(verbose=0, ignore_warnings=True, custom_metric=None)
# models, predictions = clf.fit(X_train, X_test, y_train, y_test)
# print(models)
import numpy as np
from scipy import signal, stats
from sklearn.decomposition import FastICA
from pywt import wavedec
from pywt import waverec
import matplotlib.pyplot as plt
from matplotlib import colors as mcolors
from matplotlib.collections import LineCollection
def blinkremove(eeg_signal, srate, chnum, debug_mode=False):
"""
removes blink artifact from signal
:param eeg_signal: input eeg signal
:param srate: sampling rate
:return: blink removed signal
"""
############### ICA ######################
# the length of samples must be even
del_flag = 0
if (np.shape(eeg_signal)[1]) % 2 != 0:
eeg_signal = np.delete(eeg_signal, 0, 1)
del_flag = 1
# chnum = eeg_signal.shape[0]
eeg_signal = eeg_signal.transpose()
ica = FastICA(n_components=chnum, whiten=True)
eeg_unmixed = ica.fit_transform(eeg_signal) # Reconstruct signals
iteres = ica.n_iter_
print("iteration needed:", iteres)
if iteres > 199:
return 'converge error'
myvar = np.sqrt(np.var(eeg_unmixed))
eeg_unmixed = eeg_unmixed / myvar # this is to handle the scikit learn bug!
A_ = ica.mixing_ # Get estimated mixing matrix
eeg_unmixed = eeg_unmixed.transpose()
if debug_mode:
eegplot(eeg_unmixed, srate, 'ica unmixed signal')
############### Kurtosis ######################
Kurtosis = stats.kurtosis(eeg_unmixed, axis=-1)
# thresholding
lessKurt = Kurtosis.copy()
# # throw maximum away and measure std with others
# maxIndex = np.where(lessKurt == max(lessKurt))
# maxIndex = maxIndex[0][0]
# lessKurt = np.array(lessKurt, maxIndex)
mKurt = np.mean(lessKurt)
sdKurt = np.std(lessKurt)
CI = 0.90 # two tail 0.9
tN_1 = stats.t.ppf(1 - (1 - CI) / 2, chnum - 1)
kuUpperLim = mKurt + sdKurt / np.sqrt(chnum) * tN_1
if debug_mode:
plt.figure('Kurtosis of ICs')
plt.bar(list(range(1, chnum + 1)), Kurtosis)
plt.title('Kurtosis of ICs')
plt.plot(list(range(0, chnum + 2)), kuUpperLim * np.ones(chnum + 2), 'r', 'LineWidth', 1)
############### Skewness ######################
skew = stats.skew(eeg_unmixed, axis=-1)
# thresholding
lessskew = skew.copy()
mskew = np.mean(lessskew)
sdskew = np.std(lessskew)
CI = 0.95 # two tail 0.95
tN_1 = stats.t.ppf(1 - (1 - CI) / 2, chnum - 1)
skUpperLim = mskew + sdskew / np.sqrt(chnum) * tN_1
skLowerLim = mskew - sdskew / np.sqrt(chnum) * tN_1
if debug_mode:
plt.figure('Skweness of ICs')
plt.bar(list(range(1, chnum + 1)), skew)
plt.title('Skweness of ICs')
plt.plot(list(range(0, chnum + 2)), skUpperLim * np.ones(chnum + 2), 'r', 'LineWidth', 1)
plt.plot(list(range(0, chnum + 2)), skLowerLim * np.ones(chnum + 2), 'r', 'LineWidth', 1)
############### Perform Wavelet on Blink Components ######################
isblink = (skew > skUpperLim) | (skew < skLowerLim) | (Kurtosis > kuUpperLim)
eeg_ica_noblink = eeg_unmixed.copy()
# for each blink detected IC
for xcomp in range(chnum):
if isblink[xcomp]:
blink = eeg_unmixed[xcomp, :]
coeffs = wavedec(data=blink, level=8, wavelet='bior4.4')
# null the blink wavelet coefficients
newcoeffs = coeffs.copy()
K = np.zeros(9)
for ix in range(9):
temp = coeffs[ix]
sigma2 = np.median(abs(temp) / 0.6745)
K[ix] = np.sqrt(2 * np.log10(len(temp)) * sigma2)
# type I
Indx = list(np.where(np.abs(temp) > K[ix])[0])
temp[Indx] = 0
newcoeffs[ix] = temp
eeg_ica_noblink[xcomp] = waverec(newcoeffs, 'bior4.4')
eeg_ica_noblink = np.array(eeg_ica_noblink)
if debug_mode:
eegplot(eeg_ica_noblink, srate, 'ICA Blink Removed')
############### Reconstruct EEG signals #######################
# inverse of ICA
eeg_ica_noblink = eeg_ica_noblink * myvar # this is to handle the scikit learn bug!
eeg_ica_noblink = eeg_ica_noblink.transpose()
eeg_blink_removed = ica.inverse_transform(eeg_ica_noblink)
eeg_blink_removed = eeg_blink_removed.transpose()
eeg_signal = eeg_signal.transpose()
# Spectral Coherence Measure
if debug_mode:
cxy = []
for xch in range(chnum):
# [cxy(xChannel,:), fc] = mscohere(data(xChannel,:), FinalEEG(xChannel,:), hamming(512), 256, 256, 2000)
f, Cxy = signal.coherence(eeg_signal[xch], eeg_blink_removed[xch], srate, nperseg=1024)
cxy.append(Cxy)
mcxy = np.mean(np.array(cxy), axis=0)
plt.figure("Spectral Coherence Measurement")
plt.stem(f, mcxy)
if del_flag == 1:
eeg_blink_removed = np.hstack((eeg_blink_removed, np.tile(eeg_blink_removed[:, [-1]], 1)))
return eeg_blink_removed
def eegplot(eeg_data, srate, title, fig=None, block=False):
"""
the function to plot eeg data
:param eeg_data: get eeg data
:param srate: sampling rate of eeg signal
:param title: string of plot title
:return: plot the data
"""
eeg_data = np.array(eeg_data)
eeg_data[:-1, :] = signal.detrend(eeg_data[:-1, :], axis=-1, type='linear')
n_samples, n_rows = np.shape(eeg_data)[1], np.shape(eeg_data)[0]
# normalize each column to be able to show
for i in range(n_rows):
eeg_data[i, :] = eeg_data[i, :] - np.mean(eeg_data[i, :])
eeg_data[i, :] = eeg_data[i, :] / (np.std(eeg_data[i, :]) + 1)
if fig is None:
fig = plt.figure("EEG plot of {}".format(title))
axes = plt.axes()
# Load the EEG data
data = np.transpose(eeg_data)
t = np.arange(n_samples) / srate
# Plot the EEG
ticklocs = []
axes.set_xlim(0, t.max())
# ax2.set_xticks(np.arange(10))
segs = []
y1 = 0
for i in range(n_rows):
dmin = data[:, i].min()
dmax = data[:, i].max()
dr = (dmax - dmin)
segs.append(np.column_stack((t, data[:, i])))
ticklocs.append(y1)
y1 = y1 + dr
y0 = data[:, 0].min()
axes.set_ylim(y0, y1)
offsets = np.zeros((n_rows, 2), dtype=float)
offsets[:, 1] = ticklocs
colors = [mcolors.to_rgba(c)
for c in plt.rcParams['axes.prop_cycle'].by_key()['color']]
lines = LineCollection(segs, offsets=offsets, transOffset=None, linewidths=0.5,
colors=colors, linestyle='solid')
axes.add_collection(lines)
# Set the yticks to use axes coordinates on the y axis
axes.set_yticks(ticklocs)
ch_name_list = []
for i in range(n_rows):
ch_name_list.append('CH '+str(i))
axes.set_yticklabels(ch_name_list)
axes.set_xlabel('Time (s)')
plt.suptitle(title)
plt.tight_layout()
plt.show()
import numpy as np
from blink_remove import blinkremove, eegplot
from scipy import signal
from server.sl8 import FileBase
from extra import design_filters, iec_maker
file_list = []
for filename in file_list:
try:
iec = FileBase()
iec.load(filename)
data = iec.get_brain_raw_data()
sr = iec.get_brain_sample_rate()
chnum = iec.get_brain_channel_count()
gain = iec.get_gain()
lead_off = iec.get_leadoff()
filters = design_filters(sr)
data = signal.filtfilt(filters['notch1'][0], filters['notch1'][1], data)
data = signal.filtfilt(filters['notch2'][0], filters['notch2'][1], data)
data = signal.filtfilt(filters['bandpass'][0], filters['bandpass'][1], data)
data = data[:, 1024:-1024]
eegplot(data, sr, 'EEG before ICA')
clean_data = blinkremove(data, sr, chnum, debug_mode=False)
eegplot(clean_data, sr, 'EEG after ICA')
iec_maker(clean_data, filename.split('/')[-1].split('.')[0], sr)
except Exception as e:
print(e)
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from keras.datasets.fashion_mnist import load_data
from keras.models import Sequential
from keras.layers import Conv2D, Conv2DTranspose, MaxPool2D, GaussianNoise, Reshape
from keras.optimizers import SGD, RMSprop, Adam, Adadelta, Adamax, Adagrad
(X_train_full, y_train_full), (X_test, y_test) = load_data()
X_train_full = X_train_full.astype(np.float32) / 255
X_test = X_test.astype(np.float32) / 255
X_train, X_valid = X_train_full[:55000], X_train_full[55000:]
# y_train, y_valid = y_train_full[:55000], y_train_full[55000:]
# targets = ["T-shirt/top", "Trouser", "Pullover", "Dress", "Coat", "Sandal",
# "Shirt", "Sneaker", "Bag", "Ankle boot"]
# n_rows = 3
# n_cols = 8
# plt.figure(figsize=(n_cols * 2, n_rows * 2))
# for row in range(n_rows):
# for col in range(n_cols):
# index = n_cols * row + col
# plt.subplot(n_rows, n_cols, index + 1)
# plt.imshow(X_train[index], cmap="binary")
# plt.axis('off')
# plt.title(targets[y_train[index]], fontsize=12)
# plt.subplots_adjust(wspace=0.2, hspace=0.5)
# plt.show()
denoising_encoder = Sequential([
Reshape([28, 28, 1], input_shape=[28, 28]),
GaussianNoise(0.2),
Conv2D(16, kernel_size=3, padding="SAME", activation="selu"),
MaxPool2D(pool_size=2),
Conv2D(32, kernel_size=3, padding="SAME", activation="selu"),
MaxPool2D(pool_size=2),
Conv2D(64, kernel_size=3, padding="SAME", activation="selu"),
MaxPool2D(pool_size=2)
])
# tf.keras.utils.plot_model(denoising_encoder, to_file='denoising_encoder.png', show_shapes=True)
denoising_decoder = Sequential([
Conv2DTranspose(32, kernel_size=3, strides=2, padding="VALID", activation="selu",
input_shape=[3, 3, 64]),
Conv2DTranspose(16, kernel_size=3, strides=2, padding="SAME", activation="selu"),
Conv2DTranspose(1, kernel_size=3, strides=2, padding="SAME", activation="sigmoid"),
Reshape([28, 28])
])
# tf.keras.utils.plot_model(denoising_decoder, to_file='denoising_decoder.png', show_shapes=True)
denoising_ae = Sequential([denoising_encoder, denoising_decoder])
denoising_ae.compile(loss="binary_crossentropy", optimizer=SGD(lr=0.5))
history = denoising_ae.fit(X_train, X_train, epochs=5,
validation_data=(X_valid, X_valid))
corrupted_inputs = GaussianNoise(0.2)(X_valid[8:13], training=True)
reconstructs = denoising_ae.predict(corrupted_inputs)
plt.figure(figsize=(15, 12))
for i in range(5):
plt.subplot(2, 5, 1 + i)
plt.imshow(corrupted_inputs[i], cmap='binary')
plt.axis('off')
plt.subplot(2, 5, 6 + i)
plt.imshow(reconstructs[i], cmap='binary')
plt.axis('off')
plt.show()
from keras.datasets import mnist
from keras.layers import Input, Dense
from keras.models import Model
import numpy as np
import matplotlib.pyplot as plt
# loading only images and not their labels
(X_train, _), (X_test, _) = mnist.load_data()
X_train = X_train.astype('float32') / 255
X_test = X_test.astype('float32') / 255
X_train = X_train.reshape(len(X_train), np.prod(X_train.shape[1:]))
X_test = X_test.reshape(len(X_test), np.prod(X_test.shape[1:]))
X_train_noisy = X_train + np.random.normal(loc=0.0, scale=0.5, size=X_train.shape)
X_train_noisy = np.clip(X_train_noisy, 0., 1.)
X_test_noisy = X_test + np.random.normal(loc=0.0, scale=0.5, size=X_test.shape)
X_test_noisy = np.clip(X_test_noisy, 0., 1.)
print(X_train_noisy.shape)
print(X_test_noisy.shape)
# Input image
input_img = Input(shape=(784,))
# encoded and decoded layer for the autoencoder
encoded = Dense(units=128, activation='relu')(input_img)
encoded = Dense(units=64, activation='relu')(encoded)
encoded = Dense(units=32, activation='relu')(encoded)
decoded = Dense(units=64, activation='relu')(encoded)
decoded = Dense(units=128, activation='relu')(decoded)
decoded = Dense(units=784, activation='sigmoid')(decoded)
# Building autoencoder
autoencoder = Model(input_img, decoded)
# extracting encoder
encoder = Model(input_img, encoded)
# compiling the autoencoder
autoencoder.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
# Fitting the noise trained data to the autoencoder
# autoencoder.fit(X_train_noisy, X_train_noisy,
# epochs=100,
# batch_size=128,
# shuffle=True,
# validation_data=(X_test_noisy, X_test_noisy))
autoencoder.fit(X_train_noisy, X_train,
epochs=100,
batch_size=128,
validation_split=0.2,
verbose=2)
# reconstructing the image from autoencoder and encoder
encoded_imgs = encoder.predict(X_test_noisy)
predicted = autoencoder.predict(X_test_noisy)
# plotting the noised image, encoded image and the reconstructed image
plt.figure(figsize=(20, 2))
for i in range(10):
# display original images
ax = plt.subplot(4, 20, i + 1)
plt.imshow(X_test[i].reshape(28, 28))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# display noised images
ax = plt.subplot(4, 20, i + 1 + 20)
plt.imshow(X_test_noisy[i].reshape(28, 28))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# display encoded images
ax = plt.subplot(4, 20, 2 * 20 + i + 1)
plt.imshow(encoded_imgs[i].reshape(8, 4))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# display reconstruction images
ax = plt.subplot(4, 20, 3 * 20 + i + 1)
plt.imshow(predicted[i].reshape(28, 28))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
import numpy as np
from scipy import signal
from server.sl8 import FileBase
def design_filters(s_rate):
filters = {}
output = signal.butter(N=5, Wn=np.array([45, 55]), btype='bandstop', analog=False, output='ba', fs=s_rate)
filters['notch1'] = [output[0], output[1]] # b, a
output = signal.butter(N=5, Wn=np.array([95, 105]), btype='bandstop', analog=False, output='ba', fs=s_rate)
filters['notch2'] = [output[0], output[1]] # b, a
output = signal.butter(N=5, Wn=np.array([0.53, 32]), btype='bandpass', analog=False, output='ba', fs=s_rate)
filters['bandpass'] = [output[0], output[1]] # b, a
return filters
# def iec_maker(data, id_, sps, gain, leadoff):
def iec_maker(data, id_, sps):
identifier = str(id_)
sps = int(sps)
file_base = FileBase(subject_id=0, protocol_name='', task_name=identifier, fatigue=0,
hungriness=0, sequence=[], project='1', description='')
file_base.set_brain_data(data, int(len(data)), int(len(data[0])), sps) # All fields are numeric.
file_base.set_device_data('s', '12', '12', 's') # Use standard patterns like what you can see on the server.
file_base.set_sensor('s') # Use string name of sensor.
file_base.set_extra_signal(data, int(3), int(len(data[0])), sps) # All fields are numeric.
file_base.set_output_data(data, int(len(data)), int(len(data[0])), sps) # All fields are numeric.
file_base.set_reaction_data([0], [0], [0])
# file_base.set_gain(gain)
# file_base.set_leadoff(leadoff)
file_base.save('blink_removed_iec_cropped/', filename=id_)
# file_base.save('iec_files/', filename=id_)
return 'success'
from tensorflow.keras.models import Sequential, save_model
from tensorflow.keras.layers import Input, Conv1D, Conv1DTranspose, Dense
from tensorflow.keras.constraints import max_norm
from tensorflow.keras.optimizers import SGD, RMSprop, Adam, Adamax, Adagrad, Adadelta
from keras.models import Model
import matplotlib.pyplot as plt
import numpy as np
import math
from scipy import signal
from server.sl8 import FileBase
from extra import design_filters
contaminated_file_list = ['./iec_files/1615632656_EyeOpen_1_Other.iec',
'./iec_files/1618321042_EyeOpen_6_sina_izani.iec',
'./iec_files/1618747412_EyeOpen_7_melika_hoseinzadeh.iec',
'./iec_files/1619683416_EyeOpen_9_khosravi.iec',
'./iec_files/1620309313_EyeOpen_12_mr_mohsen_shahbaz_beigi.iec',
'./iec_files/1620576311_EyeOpen_14_mahan_endallah.iec',
'./iec_files/1621949668_EyeOpen_16_diyana_ebrahimi_0.iec',
'./iec_files/1623149547_EyeOpen_1_Other_3.iec',
'./iec_files/1623154705_EyeOpen_1_Other_3.iec',
'./iec_files/1623597049_EyeOpen_18_anita_mirzayi_0.iec',
'./iec_files/1624281526_EyeOpen_1_Other_0.iec',
'./iec_files/1624281967_EyeOpen_1_Other_0.iec'
]
all_contaminated, all_clean = [], []
for contaminated_file in contaminated_file_list:
clean_file = 'blink_removed_iec_cropped/' + contaminated_file.split('/')[-1]
labeled_filename = contaminated_file.replace('iec_files', 'label_results').replace('iec', 'txt')
# make data
iec = FileBase()
iec.load(contaminated_file)
contaminated_data = iec.get_brain_raw_data()
sr = iec.get_brain_sample_rate()
gain = iec.get_gain()
filters = design_filters(sr)
contaminated_data = signal.filtfilt(filters['notch1'][0], filters['notch1'][1], contaminated_data)
contaminated_data = signal.filtfilt(filters['notch2'][0], filters['notch2'][1], contaminated_data)
contaminated_data = signal.filtfilt(filters['bandpass'][0], filters['bandpass'][1], contaminated_data)
iec2 = FileBase()
iec2.load(clean_file)
clean_data = iec2.get_brain_raw_data()
clean_sr = iec2.get_brain_sample_rate()
clean_gain = iec2.get_gain()
# from blink_remove import eegplot
# eegplot(contaminated_data, sr, 'bad')
# eegplot(clean_data, clean_sr, 'good')
# with open(labeled_filename) as f:
# content = f.readlines()
# # you may also want to remove whitespace characters like `\n` at the end of each line
# content = [x.strip()[1:-1] for x in content]
#
# arr = np.zeros((len(content), 4), dtype=int)
# for i in range(len(content)):
# temp = content[i].split(',')
# arr[i, 0] = int(temp[0])
# arr[i, 1] = int(temp[1])
# arr[i, 2] = int(temp[2])
# arr[i, 3] = int(temp[3])
# del content
arr = np.loadtxt(labeled_filename, dtype=int)
contaminated_index = arr[arr[:, -1] == 1]
contaminated_parts = []
clean_parts = []
for i in range(contaminated_index.shape[0]):
contaminated_parts.append(
contaminated_data[contaminated_index[i, 0], contaminated_index[i, 1]: contaminated_index[i, 2]])
clean_parts.append(clean_data[contaminated_index[i, 0], contaminated_index[i, 1]: contaminated_index[i, 2]])
# my_data = my_data / gain
# my_data -= np.mean(my_data)
# my_data = signal.detrend(my_data)
min_len_contaminated_parts = len(min(contaminated_parts, key=len))
min_len_clean_parts = len(min(clean_parts, key=len))
for i in range(len(contaminated_parts)):
if len(contaminated_parts[i]) > min_len_contaminated_parts:
contaminated_parts[i] = contaminated_parts[i][:-1]
for i in range(len(clean_parts)):
if len(clean_parts[i]) > min_len_clean_parts:
clean_parts[i] = clean_parts[i][:-1]
contaminated_parts = np.array(contaminated_parts)
clean_parts = np.array(clean_parts)
all_contaminated.append(contaminated_parts)
all_clean.append(clean_parts)
contaminated_parts = all_contaminated[0]
for i in range(0, len(all_contaminated) - 1):
contaminated_parts = np.concatenate((contaminated_parts, all_contaminated[i + 1]), axis=0)
clean_parts = all_clean[0]
for i in range(0, len(all_clean) - 1):
clean_parts = np.concatenate((clean_parts, all_clean[i + 1]), axis=0)
# for i in range(0, len(clean_parts)):
# # Get the sample and the reconstruction
# original = contaminated_parts[i]
# pure = clean_parts[i]
# # Matplotlib preparations
# fig, axes = plt.subplots(1, 2)
# # Plot sample and reconstruciton
# axes[0].plot(original)
# axes[0].set_title('Noisy waveform')
# axes[1].plot(pure)
# axes[1].set_title('Pure waveform')
# plt.show()
# train autoencoder
desired_model = 'conv'
if desired_model == 'conv':
# Model configuration
input_shape = (100, 1)
batch_size = 128
no_epochs = 100
train_test_split = 0.2
validation_split = 0.2
verbosity = 1
max_norm_value = 2.0
# Reshape data
y_val_noisy_r = []
y_val_pure_r = []
for i in range(0, len(contaminated_parts)):
noisy_sample = contaminated_parts[i]
pure_sample = clean_parts[i]
noisy_sample = (noisy_sample - np.min(noisy_sample)) / (np.max(noisy_sample) - np.min(noisy_sample))
pure_sample = (pure_sample - np.min(pure_sample)) / (np.max(pure_sample) - np.min(pure_sample))
# noisy_sample = noisy_sample / np.max(noisy_sample)
# pure_sample = pure_sample / np.max(pure_sample)
y_val_noisy_r.append(noisy_sample)
y_val_pure_r.append(pure_sample)
y_val_noisy_r = np.array(y_val_noisy_r)
y_val_pure_r = np.array(y_val_pure_r)
noisy_input = y_val_noisy_r.reshape((y_val_noisy_r.shape[0], y_val_noisy_r.shape[1], 1))
pure_input = y_val_pure_r.reshape((y_val_pure_r.shape[0], y_val_pure_r.shape[1], 1))
# Train/test split
percentage_training = math.floor((1 - train_test_split) * len(noisy_input))
noisy_input, noisy_input_test = noisy_input[:percentage_training], noisy_input[percentage_training:]
pure_input, pure_input_test = pure_input[:percentage_training], pure_input[percentage_training:]
# Create the model
model = Sequential()
model.add(Conv1D(128, kernel_size=3, kernel_constraint=max_norm(max_norm_value), activation='relu',
kernel_initializer='he_uniform', input_shape=input_shape))
# model.add(Conv1D(64, kernel_size=3, kernel_constraint=max_norm(max_norm_value), activation='relu',
# kernel_initializer='he_uniform'))
model.add(Conv1D(32, kernel_size=3, kernel_constraint=max_norm(max_norm_value), activation='relu',
kernel_initializer='he_uniform'))
model.add(Conv1DTranspose(32, kernel_size=3, kernel_constraint=max_norm(max_norm_value), activation='relu',
kernel_initializer='he_uniform'))
# model.add(Conv1DTranspose(64, kernel_size=3, kernel_constraint=max_norm(max_norm_value), activation='relu',
# kernel_initializer='he_uniform'))
model.add(Conv1DTranspose(128, kernel_size=3, kernel_constraint=max_norm(max_norm_value), activation='relu',
kernel_initializer='he_uniform'))
model.add(
Conv1D(1, kernel_size=3, kernel_constraint=max_norm(max_norm_value), activation='sigmoid', padding='same'))
model.summary()
# Compile and fit data
# model.compile(optimizer='adam', loss='binary_crossentropy')
model.compile(optimizer=Adam(learning_rate=0.001), loss='binary_crossentropy')
# model.fit(noisy_input, pure_input,
# epochs=no_epochs,
# batch_size=batch_size,
# validation_split=validation_split,
# verbose=verbosity)
history = model.fit(x=noisy_input, y=pure_input,
epochs=no_epochs,
batch_size=batch_size,
verbose=verbosity,
shuffle=True,
validation_data=(noisy_input_test, pure_input_test))
# "Loss"
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()
# Generate reconstructions
num_reconstructions = 20
samples = noisy_input_test[:num_reconstructions]
# samples = pure_input_test[:num_reconstructions]
reconstructions = model.predict(samples)
# Plot reconstructions
for i in np.arange(0, num_reconstructions):
# Prediction index
prediction_index = i + percentage_training
# Get the sample and the reconstruction
original = contaminated_parts[prediction_index]
pure = clean_parts[prediction_index]
reconstruction = np.array(reconstructions[i])
# # Matplotlib preparations
# fig, axes = plt.subplots(1, 3)
# # Plot sample and reconstruciton
# axes[0].plot(original)
# axes[0].set_title('Noisy waveform')
# axes[1].plot(pure)
# axes[1].set_title('Pure waveform')
# axes[2].plot(reconstruction)
# axes[2].set_title('Conv Autoencoder Denoised waveform')
# plt.show()
# Matplotlib preparations
fig, axes = plt.subplots(2, 3)
# Plot sample and reconstruciton
axes[0, 0].plot(original/max(abs(original)))
axes[0, 0].set_title('Noisy waveform')
axes[0, 1].plot(pure/max(abs(pure)))
axes[0, 1].set_title('Pure waveform')
axes[0, 2].plot(reconstruction/max(abs(reconstruction)))
axes[0, 2].set_title('Denoised waveform')
f, pxx = signal.welch(original, fs=256,
window=signal.windows.tukey(100, sym=False, alpha=.17),
nperseg=100, noverlap=int(100 * 0.75),
nfft=2 * 256, return_onesided=True,
scaling='spectrum', axis=-1, average='mean')
axes[1, 0].plot(f, pxx)
axes[1, 0].set_title('Noisy PSD')
axes[1, 0].set_xlim((0, 30))
f, pxx = signal.welch(pure, fs=256,
window=signal.windows.tukey(100, sym=False, alpha=.17),
nperseg=100, noverlap=int(100 * 0.75),
nfft=2 * 256, return_onesided=True,
scaling='spectrum', axis=-1, average='mean')
axes[1, 1].plot(f, pxx)
axes[1, 1].set_title('Pure PSD')
axes[1, 1].set_xlim((0, 30))
f, pxx = signal.welch(reconstruction.T.squeeze(), fs=256,
window=signal.windows.tukey(100, sym=False, alpha=.17),
nperseg=100, noverlap=int(100 * 0.75),
nfft=2 * 256, return_onesided=True,
scaling='spectrum', axis=-1, average='mean')
axes[1, 2].plot(f, pxx)
axes[1, 2].set_title('Denoised PSD')
axes[1, 2].set_xlim((0, 30))
plt.show()
# model.save('conv_model.h5')
elif desired_model == 'dense':
# contaminated_parts = np.vstack([contaminated_parts] * 50)
# clean_parts = np.vstack([clean_parts] * 50)
# Reshape data
y_val_noisy_r = []
y_val_pure_r = []
for i in range(0, len(contaminated_parts)):
noisy_sample = contaminated_parts[i]
pure_sample = clean_parts[i]
noisy_sample = (noisy_sample - np.min(noisy_sample)) / (np.max(noisy_sample) - np.min(noisy_sample))
pure_sample = (pure_sample - np.min(pure_sample)) / (np.max(pure_sample) - np.min(pure_sample))
# noisy_sample = noisy_sample / np.max(noisy_sample)
# pure_sample = pure_sample / np.max(pure_sample)
y_val_noisy_r.append(noisy_sample)
y_val_pure_r.append(pure_sample)
noisy_input = np.array(y_val_noisy_r)
pure_input = np.array(y_val_pure_r)
# Train/test split
percentage_training = math.floor((1 - 0.2) * len(noisy_input))
noisy_input, noisy_input_test = noisy_input[:percentage_training], noisy_input[percentage_training:]
pure_input, pure_input_test = pure_input[:percentage_training], pure_input[percentage_training:]
# Input image
input_img = Input(shape=(100,))
# encoded and decoded layer for the autoencoder
encoded = Dense(units=128, activation='relu')(input_img)
encoded = Dense(units=64, activation='relu')(encoded)
encoded = Dense(units=32, activation='relu')(encoded)
decoded = Dense(units=64, activation='relu')(encoded)
decoded = Dense(units=128, activation='relu')(decoded)
decoded = Dense(units=100, activation='sigmoid')(decoded)
# Building autoencoder
autoencoder = Model(input_img, decoded)
# extracting encoder
encoder = Model(input_img, encoded)
# compiling the autoencoder
autoencoder.compile(optimizer='adam', loss='binary_crossentropy') # , metrics=['accuracy'])
# Fitting the noise trained data to the autoencoder
# history = autoencoder.fit(noisy_input, pure_input,
# epochs=1000,
# batch_size=128,
# validation_split=0.2,
# verbose=2)
history = autoencoder.fit(x=noisy_input, y=pure_input,
epochs=100,
batch_size=64,
verbose=2,
shuffle=True,
validation_data=(noisy_input_test, pure_input_test))
# "Loss"
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()
# reconstructing the image from autoencoder and encoder
encoded_imgs = encoder.predict(noisy_input_test)
predicted = autoencoder.predict(noisy_input_test)
# # plotting the noised image, encoded image and the reconstructed image
# plt.figure(figsize=(20, 2))
# for i in range(10):
# # display original images
#
# ax = plt.subplot(4, 20, i + 1)
# plt.plot(pure_input[i])
#
# # display noised images
# ax = plt.subplot(4, 20, i + 1 + 20)
# plt.plot(noisy_input_test[i])
#
# # display encoded images
# ax = plt.subplot(4, 20, 2 * 20 + i + 1)
# plt.plot(encoded_imgs[i])
#
# # display reconstruction images
# ax = plt.subplot(4, 20, 3 * 20 + i + 1)
# plt.plot(predicted[i])
#
# plt.show()
# Generate reconstructions
num_reconstructions = 10
samples = noisy_input_test[:num_reconstructions]
reconstructions = autoencoder.predict(samples)
# Plot reconstructions
for i in np.arange(0, num_reconstructions):
# Prediction index
prediction_index = i + percentage_training
# Get the sample and the reconstruction
original = contaminated_parts[prediction_index]
pure = clean_parts[prediction_index]
reconstruction = np.array(reconstructions[i])
# Matplotlib preparations
fig, axes = plt.subplots(1, 3)
# Plot sample and reconstruciton
axes[0].plot(original)
axes[0].set_title('Noisy waveform')
axes[1].plot(pure)
axes[1].set_title('Pure waveform')
axes[2].plot(reconstruction)
axes[2].set_title('Conv Autoencoder Denoised waveform')
plt.show()
# predict with SVM then predict with autoencoder
import bz2
import os
from datetime import datetime
import _pickle
DC_VERSION = '3.4.0'
class FileBase:
def __init__(self,
subject_id=None, protocol_name=None, task_name=None, fatigue=None, hungriness=None, sequence=None,
project=None, time=None, description=None, gain=24, tags=None, leadoff=None, line=None):
if leadoff is None:
leadoff = []
if tags is None:
tags = []
self.line = line
self.leadoff = leadoff
self.tags = tags
self.gain = gain
self.subject = subject_id
self.protocol = protocol_name
self.task = task_name
self.description = description
self.fatigue = fatigue
self.hungriness = hungriness
self.sequence = sequence
self.project = project
self.timestamp = time if time else datetime.now()
self.filename = str(int(datetime.timestamp(self.timestamp))) + '_' + str(self.task) + '_' + str(self.subject)
self.device_data = None
self.brain_data = None
self.extra_signal = None
self.output_data = None
self.reaction_data = None
# SETTERS
def set_leadoff(self, leadoff):
self.leadoff = leadoff
def set_line(self, line):
self.line = line
def set_tags(self, tags):
self.tags = tags
def set_gain(self, gain):
self.gain = gain
def set_subject(self, subject_id):
self.subject = subject_id
def set_protocol(self, protocol_name):
self.protocol = protocol_name
def set_description(self, description):
self.description = description
def set_sequence(self, sequence):
self.sequence = sequence
def set_project(self, project):
self.project = project
def set_hungriness(self, hungriness):
self.hungriness = hungriness
def set_fatigue(self, fatigue):
self.fatigue = fatigue
def set_task(self, task_name):
self.task = task_name
def set_timestamp(self, timestamp):
self.timestamp = timestamp
def set_reaction_data(self, feedback, label, correct_answers=None):
if self.brain_data:
self.reaction_data = ReactionData(feedback, label)
return self.reaction_data
else:
return None
def set_feedback(self, feedback):
self.reaction_data.set_feedback(feedback)
def set_label(self, label):
self.reaction_data.set_label(label)
def set_device_data(self, device_name, software_version, firmware_version, sensor_name):
self.device_data = DeviceData(device_name, software_version, firmware_version, sensor_name)
return self.device_data
def set_sensor(self, sensor_name):
self.device_data.set_sensor(sensor_name)
return self.device_data
def set_brain_data(self, raw_data, channel_count, sample_count, sample_rate, channels_name=None):
self.brain_data = BrainData(raw_data, channel_count, sample_count, sample_rate, channels_name)
return self.brain_data
def set_brain_channels_name(self, names):
if self.brain_data:
return self.brain_data.set_channels_name(names)
else:
return False
def set_brain_channel_name(self, channel, name):
if self.brain_data:
return self.brain_data.set_channel_name(channel, name)
else:
return False
def set_extra_signal(self, raw_data, channel_count, sample_count, sample_rate, channels_name=None):
self.extra_signal = ExtraSignal(raw_data, channel_count, sample_count, sample_rate, channels_name)
return self.extra_signal
def set_extra_channels_name(self, names):
if self.extra_signal:
return self.extra_signal.set_channels_name(names)
else:
return False
def set_extra_channel_name(self, channel, name):
if self.extra_signal:
return self.extra_signal.set_channel_name(channel, name)
else:
return False
def set_output_data(self, raw_data, channel_count, sample_count, sample_rate):
self.output_data = OutputData(raw_data, channel_count, sample_count, sample_rate)
return self.output_data
# FILE WORLD
def load(self, filename, compress=False):
if os.path.isfile(filename):
try:
if compress:
f = bz2.BZ2File(filename, 'rb')
else:
f = open(filename, 'rb')
tmp_dict = _pickle.load(f)
f.close()
self.__dict__.update(tmp_dict)
return 'OK'
except Exception as e:
print(e)
return 'File can not load.'
return 'File not exist.'
def load_from_file(self, file, compress=False):
try:
if compress:
file = bz2.BZ2File(file)
tmp_dict = _pickle.load(file)
self.__dict__.update(tmp_dict)
return 'OK'
except Exception as e:
print(e)
return 'File can not open.'
def save(self, path_to_file, filename=None, compress=False):
if not os.path.exists(path_to_file):
os.mkdir(path_to_file)
if filename is None:
filename = self.filename
file = str(path_to_file + filename) + ".iec"
try:
if compress:
with bz2.BZ2File(file, 'w') as f:
_pickle.dump(self.__dict__, f, 4)
else:
with open(file, 'wb+') as f:
_pickle.dump(self.__dict__, f, 4)
return self.get_filename()
except Exception as e:
print(e)
return 'File can not save.'
# GETTERS
def get_leadoff(self):
return self.leadoff
def get_line(self):
return self.line
def get_tags(self):
return self.tags
def get_gain(self):
return self.gain
def get_subject_id(self):
return self.subject
def get_protocol_name(self):
return self.protocol
def get_task_name(self):
return self.task
def get_description(self):
return self.description
def get_sequence(self):
return self.sequence
def get_project(self):
return self.project
def get_hungriness(self):
return self.hungriness
def get_fatigue(self):
return self.fatigue
def get_timestamp(self):
return self.timestamp
def get_filename(self):
return self.filename
# output
def get_output_raw_data(self):
if self.output_data:
return self.output_data.get_raw_data()
else:
return None
def get_output_channel_count(self):
if self.output_data:
return self.output_data.get_channel_count()
else:
return None
def get_output_sample_count(self):
if self.output_data:
return self.output_data.get_sample_count()
else:
return None
def get_output_sample_rate(self):
if self.output_data:
return self.output_data.get_sample_rate()
else:
return None
# extra signal
def get_extra_raw_data(self):
if self.extra_signal:
return self.extra_signal.get_raw_data()
else:
return None
def get_extra_channel_count(self):
if self.extra_signal:
return self.extra_signal.get_channel_count()
else:
return None
def get_extra_sample_count(self):
if self.extra_signal:
return self.extra_signal.get_sample_count()
else:
return None
def get_extra_sample_rate(self):
if self.extra_signal:
return self.extra_signal.get_sample_rate()
else:
return None
def get_extra_channels_name(self):
if self.extra_signal:
return self.extra_signal.get_channels_name()
else:
return None
def get_extra_channel_name(self, channel):
if self.extra_signal:
return self.extra_signal.get_channel_name(channel)
else:
return None
# brain data
def get_brain_raw_data(self):
if self.brain_data:
return self.brain_data.get_raw_data()
else:
return None
def get_brain_channel_count(self):
if self.brain_data:
return self.brain_data.get_channel_count()
else:
return None
def get_brain_sample_count(self):
if self.brain_data:
return self.brain_data.get_sample_count()
else:
return None
def get_brain_sample_rate(self):
if self.brain_data:
return self.brain_data.get_sample_rate()
else:
return None
def get_brain_channels_name(self):
if self.brain_data:
return self.brain_data.get_channels_name()
else:
return None
def get_brain_channel_name(self, channel):
if self.brain_data:
return self.brain_data.get_channel_name(channel)
else:
return None
# device and sensor
def get_device_info(self):
if self.device_data:
return self.device_data.get_device_info()
else:
return None
def get_sensor_name(self):
if self.device_data:
return self.device_data.get_sensor_name()
else:
return None
# reaction data
def get_feedback_data(self):
if self.reaction_data:
return self.reaction_data.get_feedback()
else:
return None
def get_label_data(self):
if self.reaction_data:
return self.reaction_data.get_label()
else:
return None
class OutputData:
def __init__(self, raw_data, channel_count, sample_count, sample_rate):
self.raw_data = raw_data
self.channel_count = channel_count
self.sample_count = sample_count
self.sample_rate = sample_rate
def get_raw_data(self):
return self.raw_data
def get_channel_count(self):
return self.channel_count
def get_sample_count(self):
return self.sample_count
def get_sample_rate(self):
return self.sample_rate
class ExtraSignal:
def __init__(self, raw_data, channel_count=0, sample_count=0, sample_rate=0, channels_name=None):
if channels_name is None:
channels_name = []
for i in range(channel_count):
channels_name.append('')
self.channels_name = channels_name
self.raw_data = raw_data
self.channel_count = channel_count
self.sample_count = sample_count
self.sample_rate = sample_rate
def get_raw_data(self):
return self.raw_data
def get_channel_count(self):
return self.channel_count
def get_channels_name(self):
return self.channels_name
def get_channel_name(self, channel):
if channel < len(self.channels_name):
return self.channels_name[channel]
else:
return None
def set_channels_name(self, names):
if isinstance(names, list) and len(names) == self.channel_count:
self.channels_name = names
return True
else:
return False
def set_channel_name(self, channel, name):
if not self.channels_name:
channels_name = []
for i in range(self.channel_count):
channels_name.append('')
if channel < self.channel_count:
self.channels_name[channel] = name
return True
else:
return False
def get_sample_count(self):
return self.sample_count
def get_sample_rate(self):
return self.sample_rate
class BrainData:
def __init__(self, raw_data, channel_count=0, sample_count=0, sample_rate=0, channels_name=None):
if channels_name is None:
channels_name = []
for i in range(channel_count):
channels_name.append('')
self.channels_name = channels_name
self.raw_data = raw_data
self.channel_count = channel_count
self.sample_count = sample_count
self.sample_rate = sample_rate
def get_raw_data(self):
return self.raw_data
def get_channel_count(self):
return self.channel_count
def get_channels_name(self):
return self.channels_name
def get_channel_name(self, channel):
if channel < len(self.channels_name):
return self.channels_name[channel]
else:
return None
def set_channels_name(self, names):
if isinstance(names, list) and len(names) == self.channel_count:
self.channels_name = names
return True
else:
return False
def set_channel_name(self, channel, name):
if not self.channels_name:
channels_name = []
for i in range(self.channel_count):
channels_name.append('')
if channel < self.channel_count:
self.channels_name[channel] = name
return True
else:
return False
def get_sample_count(self):
return self.sample_count
def get_sample_rate(self):
return self.sample_rate
class DeviceData:
def __init__(self, device_name, software_version, firmware_version, sensor_name):
self.device_name = device_name
self.software_version = software_version
self.firmware_version = firmware_version
self.sensor_name = sensor_name
def get_device_info(self):
return {
'device_name': self.device_name,
'software_version': self.software_version,
'firmware_version': self.firmware_version
}
def get_sensor_name(self):
return self.sensor_name
def set_sensor(self, sensor_name):
self.sensor_name = sensor_name
class ReactionData:
def __init__(self, feedback, label):
self.feedback = feedback
self.label = label
def get_feedback(self):
return self.feedback
def get_label(self):
return self.label
def set_feedback(self, feedback):
self.feedback = feedback
def set_label(self, label):
self.label = label
import matplotlib.pyplot as plt
import numpy as np
# Sample configuration
num_samples_visualize = 5
noise_factor = 0.05
# Load data
data = np.load('./signal_waves_medium.npy')
x_val, y_val = data[:, 0], data[:, 1]
# Add noise to data
noisy_samples = []
for i in range(0, len(x_val)):
if i % 100 == 0:
print(i)
pure = np.array(y_val[i])
noise = np.random.normal(0, 10, pure.shape)
signal = pure + noise_factor * noise
noisy_samples.append([x_val[i], signal])
# Save data to file for re-use
np.save('./signal_waves_noisy_medium.npy', noisy_samples)
# Visualize a few random samples
for i in range(0, num_samples_visualize):
random_index = np.random.randint(0, len(noisy_samples) - 1)
x_axis, y_axis = noisy_samples[random_index]
plt.plot(x_axis, y_axis)
plt.title(f'Visualization of noisy sample {random_index} ---- y: f(x) = x^2')
plt.show()
from tensorflow.keras.models import Sequential, save_model
from tensorflow.keras.layers import Conv1D, Conv1DTranspose
from tensorflow.keras.constraints import max_norm
import matplotlib.pyplot as plt
import numpy as np
import math
# Model configuration
input_shape = (150, 1)
batch_size = 150
no_epochs = 5
train_test_split = 0.3
validation_split = 0.2
verbosity = 1
max_norm_value = 2.0
# Load data
data_noisy = np.load('./signal_waves_noisy_medium.npy')
x_val_noisy, y_val_noisy = data_noisy[:, 0], data_noisy[:, 1]
data_pure = np.load('./signal_waves_medium.npy')
x_val_pure, y_val_pure = data_pure[:, 0], data_pure[:, 1]
# Reshape data
y_val_noisy_r = []
y_val_pure_r = []
for i in range(0, len(y_val_noisy)):
noisy_sample = y_val_noisy[i]
pure_sample = y_val_pure[i]
# noisy_sample = (noisy_sample - np.min(noisy_sample)) / (np.max(noisy_sample) - np.min(noisy_sample))
# pure_sample = (pure_sample - np.min(pure_sample)) / (np.max(pure_sample) - np.min(pure_sample))
noisy_sample = noisy_sample / np.max(noisy_sample)
pure_sample = pure_sample / np.max(pure_sample)
y_val_noisy_r.append(noisy_sample)
y_val_pure_r.append(pure_sample)
y_val_noisy_r = np.array(y_val_noisy_r)
y_val_pure_r = np.array(y_val_pure_r)
noisy_input = y_val_noisy_r.reshape((y_val_noisy_r.shape[0], y_val_noisy_r.shape[1], 1))
pure_input = y_val_pure_r.reshape((y_val_pure_r.shape[0], y_val_pure_r.shape[1], 1))
# Train/test split
percentage_training = math.floor((1 - train_test_split) * len(noisy_input))
noisy_input, noisy_input_test = noisy_input[:percentage_training], noisy_input[percentage_training:]
pure_input, pure_input_test = pure_input[:percentage_training], pure_input[percentage_training:]
# Create the model
model = Sequential()
model.add(Conv1D(150, kernel_size=3, kernel_constraint=max_norm(max_norm_value), activation='relu',
kernel_initializer='he_uniform', input_shape=input_shape))
# model.add(Conv1D(64, kernel_size=3, kernel_constraint=max_norm(max_norm_value), activation='relu',
# kernel_initializer='he_uniform'))
model.add(Conv1D(32, kernel_size=3, kernel_constraint=max_norm(max_norm_value), activation='relu',
kernel_initializer='he_uniform'))
model.add(Conv1DTranspose(32, kernel_size=3, kernel_constraint=max_norm(max_norm_value), activation='relu',
kernel_initializer='he_uniform'))
# model.add(Conv1DTranspose(64, kernel_size=3, kernel_constraint=max_norm(max_norm_value), activation='relu',
# kernel_initializer='he_uniform'))
model.add(Conv1DTranspose(150, kernel_size=3, kernel_constraint=max_norm(max_norm_value), activation='relu',
kernel_initializer='he_uniform'))
model.add(Conv1D(1, kernel_size=3, kernel_constraint=max_norm(max_norm_value), activation='sigmoid', padding='same'))
model.summary()
# Compile and fit data
model.compile(optimizer='adam', loss='binary_crossentropy')
# model.fit(noisy_input, pure_input,
# epochs=no_epochs,
# batch_size=batch_size,
# validation_split=validation_split,
# verbose=verbosity)
model.fit(x=noisy_input, y=pure_input,
epochs=no_epochs,
batch_size=batch_size,
verbose=verbosity,
shuffle=True,
validation_data=(noisy_input_test, pure_input_test))
# Generate reconstructions
num_reconstructions = 4
samples = noisy_input_test[:num_reconstructions]
reconstructions = model.predict(samples)
# Plot reconstructions
for i in np.arange(0, num_reconstructions):
# Prediction index
prediction_index = i + percentage_training
# Get the sample and the reconstruction
original = y_val_noisy[prediction_index]
pure = y_val_pure[prediction_index]
reconstruction = np.array(reconstructions[i])
# Matplotlib preparations
fig, axes = plt.subplots(1, 3)
# Plot sample and reconstruciton
axes[0].plot(original)
axes[0].set_title('Noisy waveform')
axes[1].plot(pure)
axes[1].set_title('Pure waveform')
axes[2].plot(reconstruction)
axes[2].set_title('Conv Autoencoder Denoised waveform')
plt.show()
import matplotlib.pyplot as plt
import numpy as np
# Sample configuration
num_samples = 100000
# Intrasample configuration
num_elements = 1
interval_per_element = 0.01
total_num_elements = int(num_elements / interval_per_element)
starting_point = int(0 - 1 * total_num_elements)
# Other configuration
num_samples_visualize = 1
# Containers for samples and subsamples
samples = []
xs = []
ys = []
# Generate samples
for j in range(0, num_samples):
# Report progress
if j % 100 == 0:
print(j)
# Generate wave
for i in range(starting_point, total_num_elements):
x_val = i * interval_per_element
y_val = x_val * x_val
xs.append(x_val)
ys.append(y_val)
# Append wave to samples
samples.append((xs, ys))
# Clear subsample containers for next sample
xs = []
ys = []
# Input shape
print(np.shape(np.array(samples[0][0])))
# Save data to file for re-use
np.save('./signal_waves_medium.npy', samples)
# Visualize a few random samples
for i in range(0, num_samples_visualize):
random_index = np.random.randint(0, len(samples) - 1)
x_axis, y_axis = samples[random_index]
plt.plot(x_axis, y_axis)
plt.title(f'Visualization of sample {random_index} ---- y: f(x) = x^2')
plt.show()
import numpy as np
import pickle
import matplotlib.pyplot as plt
from scipy import signal, stats
from keras.models import load_model
from server.sl8 import FileBase
from extra import design_filters
from blink_remove import eegplot
def feature_extraction(data_):
var = np.var(data_)
std = np.sqrt(var)
rms = np.sqrt(np.mean(data_ ** 2))
skewness = stats.skew(data_)
kurtosis = stats.kurtosis(data_, fisher=False)
ptop = np.max(data_) - np.min(data_)
auc = int(np.trapz(data_, dx=1))
zero_crossings = np.where(np.diff(np.sign(data_)))[0]
return [var, std, rms, skewness, kurtosis, ptop, auc, len(zero_crossings)]
# load classifier
with open('svclassifier_new3.pkl', 'rb') as fid:
my_model = pickle.load(fid)
model = my_model['model']
scaler = my_model['scaler']
conv_model = load_model('conv_model.h5')
file_list = ['./iec_files/1615632656_EyeOpen_1_Other.iec',
'./iec_files/1618321042_EyeOpen_6_sina_izani.iec',
'./iec_files/1618747412_EyeOpen_7_melika_hoseinzadeh.iec',
'./iec_files/1619683416_EyeOpen_9_khosravi.iec',
'./iec_files/1620309313_EyeOpen_12_mr_mohsen_shahbaz_beigi.iec',
'./iec_files/1620576311_EyeOpen_14_mahan_endallah.iec',
'./iec_files/1621949668_EyeOpen_16_diyana_ebrahimi_0.iec',
'./iec_files/1623149547_EyeOpen_1_Other_3.iec',
'./iec_files/1623154705_EyeOpen_1_Other_3.iec',
'./iec_files/1623597049_EyeOpen_18_anita_mirzayi_0.iec',
'./iec_files/1624281526_EyeOpen_1_Other_0.iec',
'./iec_files/1624281967_EyeOpen_1_Other_0.iec'
]
file_list = [
'./test_iec/1624793690_EyeOpen_20_afshari_0.iec',
# './test_iec/1625637362_EyeOpen_5_maryam_miralmasi_3.iec'
# './test_iec/1625758306_EyeOpen_7_asghar_Azimi_3.iec'
]
# for file in file_list:
# iec = FileBase()
# iec.load(file)
# data = iec.get_brain_raw_data()
# sr = iec.get_brain_sample_rate()
# sc = iec.get_brain_sample_count()
# gain = iec.get_gain()
#
# # data = data / gain
#
# eegplot(data, sr, 'EEG')
#
# filters = design_filters(sr)
#
# for i in range(2):
# for j in range(0, len(data[i]) - 512, 512):
# lst = []
# my_data = data[i, j: j + 512]
#
# my_data = signal.filtfilt(filters['notch1'][0], filters['notch1'][1], my_data)
# my_data = signal.filtfilt(filters['notch2'][0], filters['notch2'][1], my_data)
# my_data = signal.filtfilt(filters['bandpass'][0], filters['bandpass'][1], my_data)
#
# my_data = my_data / gain
# # my_data -= np.mean(my_data)
# my_data = signal.detrend(my_data)
#
# for k in range(0, len(my_data) - 100, 40):
# new_data = my_data[k: k + 100]
#
# features = np.array(feature_extraction(new_data))
# temp = scaler.transform(features.reshape(1, features.shape[0]))
# state = model.predict(temp)
#
# if state:
# print('Blink', i, j, j + 512, k, k + 100, model.predict_proba(temp)[0, 1])
#
# a = np.min(new_data)
# b = np.max(new_data)
# samples = (new_data - a) / (b - a)
# samples = samples.reshape((1, new_data.shape[0], 1))
# predicted = conv_model.predict(samples).squeeze()
# predicted = predicted * (b - a) + a
# # my_data[k: k+100] = predicted
# lst.append([i, j, j + 512, k, k + 100, predicted])
# # plt.plot(my_data)
# # plt.axvline(x=k, color='r', linestyle='-')
# # plt.axvline(x=k + 100, color='r', linestyle='-')
# # plt.show()
# else:
# print('Clean')
# if len(lst) > 0:
# start = lst[0][3]
# end = lst[0][4]
#
# fig, axes = plt.subplots(2, 2)
# axes[0, 0].plot(my_data)
# axes[0, 0].set_title('before')
# f, pxx = signal.welch(my_data, fs=sr,
# window=signal.windows.tukey(len(my_data), sym=False, alpha=.17),
# nperseg=len(my_data), noverlap=int(len(my_data) * 0.75),
# nfft=2 * sr, return_onesided=True,
# scaling='spectrum', axis=-1, average='mean')
# a = pxx
# axes[1, 0].plot(f, pxx)
# axes[1, 0].set_xlim((0, 30))
# axes[1, 0].set_title('psd')
# # main_mean = np.mean(my_data)
# # win_mean = np.mean(lst[0][5])
# # lst[0][5] = lst[0][5] + (main_mean - win_mean)
# my_data[start: end] = lst[0][5]
# if len(lst) > 1:
# for x in range(1, len(lst)):
# start_new = lst[x][3]
# end_new = lst[x][4]
# # win_mean = np.mean(lst[x][5])
# # lst[x][5] = lst[x][5] + (main_mean - win_mean)
# if end_new - end > 100:
# my_data[start_new: end_new] = lst[x][5]
# end = end_new
# else:
# my_data[end: end_new] = lst[x][5][-(end_new - end):]
# end = end_new
# axes[0, 1].plot(my_data)
# axes[0, 1].set_title('after')
# f, pxx = signal.welch(my_data, fs=sr,
# window=signal.windows.tukey(len(my_data), sym=False, alpha=.17),
# nperseg=len(my_data), noverlap=int(len(my_data) * 0.75),
# nfft=2 * sr, return_onesided=True,
# scaling='spectrum', axis=-1, average='mean')
# b = pxx
# axes[1, 1].plot(f, pxx)
# axes[1, 1].set_xlim((0, 30))
# axes[1, 1].set_title('psd')
# plt.show()
#
# f, Cxy = signal.coherence(a, b, sr,
# nperseg=len(my_data) // 8, noverlap=int(len(my_data) // 8 * 0.5),
# nfft=2 * sr, axis=-1)
# coh = np.sum(Cxy[2:60], axis=0)
# plt.title(str(coh)+' / 60')
# plt.plot(f, Cxy)
# plt.xlim((0, 30))
# plt.show()
# plt.figure()
# plt.plot(f, a)
# plt.plot(f, b)
# plt.xlim((0, 30))
# plt.show()
# # eegplot(my_data, sr, 'EEG')
file = file_list[0]
iec = FileBase()
iec.load(file)
iec_data = iec.get_brain_raw_data()
sr = iec.get_brain_sample_rate()
sc = iec.get_brain_sample_count()
gain = iec.get_gain()
# eegplot(iec_data, sr, 'EEG')
filters = design_filters(sr)
iec_data = np.array(iec_data)
for k in range(0, iec_data.shape[1] - 512, 512):
org_data = iec_data[:, k:k+512]
self_data = org_data / gain
self_data = signal.detrend(self_data, axis=1)
self_data = signal.filtfilt(filters['notch1'][0], filters['notch1'][1], self_data)
self_data = signal.filtfilt(filters['notch2'][0], filters['notch2'][1], self_data)
self_data = signal.filtfilt(filters['bandpass'][0], filters['bandpass'][1], self_data)
for i in range(2):
lst = []
data = self_data[i, :]
for j in range(0, data.shape[0] - 100, 40):
new_data = data[j: j + 100]
features = np.array(feature_extraction(new_data))
temp = scaler.transform(features.reshape(1, features.shape[0]))
state = model.predict(temp)
if state:
print('Blink', i, j, j + 100, model.predict_proba(temp)[0, 1])
a = np.min(new_data)
b = np.max(new_data)
samples = (new_data - a) / (b - a)
samples = samples.reshape((1, new_data.shape[0], 1))
predicted = conv_model.predict(samples).squeeze()
predicted = predicted * (b - a) + a
lst.append([i, j, j + 100, predicted])
else:
print('clean')
if len(lst) > 0:
fig, axes = plt.subplots(2, 2)
axes[0, 0].plot(data)
axes[0, 0].set_title('before')
f, pxx = signal.welch(data, fs=sr,
window=signal.windows.tukey(len(data), sym=False, alpha=.17),
nperseg=len(data), noverlap=int(len(data) * 0.75),
nfft=2 * sr, return_onesided=True,
scaling='spectrum', axis=-1, average='mean')
axes[1, 0].plot(f, pxx)
axes[1, 0].set_xlim((0, 30))
axes[1, 0].set_title('psd')
# main_mean = np.mean(org_data)
# win_mean = np.mean(lst[0][3])
# lst[0][3] = lst[0][3] + (main_mean - win_mean)
start = lst[0][1]
end = lst[0][2]
data[start: end] = lst[0][3]
if len(lst) > 1:
for x in range(1, len(lst)):
start_new = lst[x][1]
end_new = lst[x][2]
# win_mean = np.mean(lst[x][5])
# lst[x][5] = lst[x][5] + (main_mean - win_mean)
if end_new - end > 100:
data[start_new: end_new] = lst[x][3]
end = end_new
else:
data[end: end_new] = lst[x][3][-(end_new - end):]
end = end_new
axes[0, 1].plot(data)
axes[0, 1].set_title('after')
f, pxx = signal.welch(data, fs=sr,
window=signal.windows.tukey(len(data), sym=False, alpha=.17),
nperseg=len(data), noverlap=int(len(data) * 0.75),
nfft=2 * sr, return_onesided=True,
scaling='spectrum', axis=-1, average='mean')
axes[1, 1].plot(f, pxx)
axes[1, 1].set_xlim((0, 30))
axes[1, 1].set_title('psd')
plt.show()
self_data[i, :] = data
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment