#!/usr/local/anaconda3/bin/python # This code implements mathematics presented by Chad Spooner in # his blog at https://cyclostationary.blog/ to create signals for # CSP analysis. # # Subroutines are in alphabetical order. # # Variable names use the format thing_units, where underscore separates name # from its units. E.g., fc_Hz is carrier freq in Hz. # # mkSig() is the method of main interest. Nearly all other methods support # mkSig(). # # Mike Markowski # mike.ab3ap@gmail.com # Mar 2021 # To recreate signals for Chad's cyclostationary.blog: # # mkSig.py -t bpsk -s 10 -n 4000 -c 0.05 -b 0.1 # # meaning BPSK with S/N 10dB, 4000 symbols, fc=0.05 Hz and bits/s=0.1. from numpy import cos, log10, pi, sin, sqrt import matplotlib.pyplot as plt import numpy as np import random import sys ########################################################################### # # B E G I N # # Methods not used anywhere. Part of Chad's Matlab demo code, converted to # python and remain here as unneeded relics. # #vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv def addNoise(sig_iq, n_dBW): '''Add specified noise power to a signal. Inputs: sig_iq (complex[]) : signal to be made noisy. n_dBW (float) : how many dB of noise to add. Output: (complex[]) : a copy of the input signal with noise added. ''' # Unscaled noise. n_iq = np.random.randn(sig_iq.size) + 1j*np.random.randn(sig_iq.size) measured_W = np.var(n_iq, ddof=1) # Variance as Matlab calculates. desired_W = dB_to_W(n_dBW) powFactor = sqrt(desired_W/measured_W) n_iq *= powFactor return sig_iq + n_iq def bpskSeq(sps, numSyms): ''' !!! NOT CALLED ANYWHERE. LEFT HERE BECAUSE IT'S FROM CHAD'S MATLAB CODE. !!! Create random BPSK symbol sequence. https://cyclostationary.blog/2015/09/28/creating-a-simple-cs-signal-rectangular-pulse-bpsk/ Inputs: sps (int) : samples/symbol. numSyms (int) : number of bits in random sequence. Output: (float[]) : symbol sequence [b0 b0 ... b0 b1 b1 ... b1 ...] where bn are random bit levels, 1 or -1, sps copies of each. ''' c = constellationPsk(2).real # BPSK constellation, [-1, 1]. # Random sequence of 0s and 1s, converted to -1s and 1s. ind = np.random.randint(0, 2, numSyms) syms = c[ind] # Symbol sequence from bit sequence. zeroMat = np.zeros((sps-1, numSyms)) # sps-1 zeros trail 1/-1 values. syms1 = np.vstack((syms, zeroMat)) # 1/-1 values as column headers. syms1 = syms1.transpose() # Make 1/-1 values row headers. syms1 = syms1.flatten() # [bit zeros bit zeros ...] pulse = np.ones(sps) # Pulse function smoothing window, 1 symbol width. square = np.convolve(pulse, syms1) # Turn impulses into square wave. # Way simpler, no trailing zeros due to convolve, above: # square = np.repeat(syms, sps) return square #^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # # E N D # # Methods not used anywhere. Part of Chad's Matlab demo code, converted to # python and remain here as unneeded relics. # ########################################################################### def bpskDual(writeFile=False): ''' Generate two BPSK signals with slightly different center frequencies and different symbol rates. This provides a nice example to show off CSP's benefits. Values from example in Eric April's 1991 paper, "The Advantages of Cyclostationary Processing." Really nice signal to demonstrate the utilty of CSP. ''' c = np.array([-1, 1]) # BPSK I-only constellation. factor = 2 # BPSK2 has double symbol rate of BPSK1. T1_bit = 16 # Sa/sym in BPSK1. T2_bit = factor*T1_bit # Sa/sym in BPSK2. nSym1 = 4000 # Syms in generated BPSK1 signal. nSym2 = nSym1 // factor # Syms in generated BPSK2 signal. fc1_Hz = 3.3/32 fc2_Hz = 4.0/32 ind1 = np.random.randint(0, 2, nSym1) # Random symbol sequence. ind2 = np.random.randint(0, 2, nSym2) syms1 = np.repeat(c[ind1], T1_bit) # Baseband BPSK signal. syms2 = np.repeat(c[ind2], T2_bit) bpsk1 = mix(syms1, fc1_Hz) # Mix to fc. bpsk2 = mix(syms2, fc2_Hz) sig = mkSnr(bpsk1+bpsk2, snr_dB=10) if writeFile: writeSig(sig, 'bpskDual.iq') return sig def cmdLine(argv): # Start with defaults. c = configDefault() bps = c['bps'] # Data rata in bits/sec. fc_Hz = c['fc_Hz'] # Center frequency in Hz. fs_Hz = c['fs_Hz'] # Sample rate in Sa/sec. Lintense = c['Lintense'] # Phase noise intensity. nSym = c['nSym'] # Number of symbols in generated signal. out = c['out'] # File to create, ignored here. sigType = c['sig'] # Modulation to use. snr_dB = c['snr_dB'] # S/N ratio of generated signal. prog = argv[0] argv = argv[1:] i = 0 while i < len(argv): opt = argv[i] if opt == '-b': # Bits/sec i += 1 arg = argv[i] bps = typeVerify(arg, float) elif opt == '-c': # Center freq in Hz. i += 1 arg = argv[i] fc_Hz = typeVerify(arg, float) elif opt == '-d': # Dual BPSK example. bpskDual(writeFile=True) sys.exit(0) elif opt == '-f': # Sample rate in Hz. i += 1 arg = argv[i] fs_Hz = typeVerify(arg, float) elif opt == '-h': # Help. usage() elif opt == '-n': # Number of symbols. i += 1 arg = argv[i] nSym = typeVerify(arg, int) elif opt == '-o': # Output file. i += 1 arg = argv[i] out = arg elif opt == '-p': # Phase noise intensity. i += 1 arg = argv[i] Lintense = typeVerify(arg, float) elif opt == '-s': # Sig to noise in dB. i += 1 arg = argv[i] snr_dB = typeVerify(arg, float) elif opt == '-t': # Signal type. i += 1 arg = argv[i] sigType = arg.strip().lower() else: usage() i += 1 err = False if sigType not in ['16qam', '4psk', '4qam', '64qam', '8psk', 'bpsk', 'qpsk']: msg = 'Missing -t {16qam|4psk|4qam|64qam|8psk|bpsk|qpsk} ' msg += 'signal type.\n' print(msg) usage() # Create configuration dictionary as return value. config = {} config['bps'] = bps config['fc_Hz'] = fc_Hz config['fs_Hz'] = fs_Hz config['Lintense'] = Lintense config['nSym'] = nSym config['out'] = out config['sig'] = sigType config['snr_dB'] = snr_dB return config def configDefault(): '''Return config with default values. The default dictionary returned is: config['bps'] = 0.1 config['fc_Hz'] = 0 config['fs_Hz'] = 1 config['Lintense'] = 0 config['nSym'] = 10 config['out'] = '' config['sig'] = 'bpsk' config['snr_dB'] = 30 ''' # Defaults. bps = 0.1 # 0.1 bit/sec. fc_Hz = 0 # Centered at 0 Hz. fs_Hz = 1 # 1 Hz = 1 Sa/sec sample rate. Lintense = 0 # Phase noise intensity. nSym = 10 # symbols. out = '' # Output file name. sigType = 'bpsk' # Signal type. snr = 30 # dB. # Create configuration dictionary as return value. config = {} config['bps'] = bps config['fc_Hz'] = fc_Hz config['fs_Hz'] = fs_Hz config['Lintense'] = Lintense config['nSym'] = nSym config['out'] = out config['sig'] = sigType config['snr_dB'] = snr return config def constellationPsk(m): return np.exp(2j*pi*np.arange(m)/m) # m-PSK constellation. def constellationQam(side): ind = np.arange(-(side-1), side, 2)/(side-1) # 'side' odds about 0. const = np.zeros((side, side), dtype=complex) for i,I in enumerate(ind): for j,Q in enumerate(ind): const[i,j] = I + 1j*Q return const def dB_to_W(val_dB): return 10**(val_dB/10) def demodBpsk(sig, fc_Hz, sps=1): '''Naive BPSK demodulator. Inputs sps (int) : samples/symbol. ''' bb = mix(sig, -fc_Hz) # Baseband. if sps > 1: # Replace all samples in symbol with their average. for sym in np.arange(sig.size, dtype=int) // sps: # Symbols in signal. i = sym*sps # Index of first sample in symbol number 'sym'. bb[i:i+sps] = np.mean(bb[i:i+sps]) bb[bb<0] = -1 # Smooth 0s. bb[bb>0] = 1 # Smooth 1s. return bb def mix(sig_iq, fc_Hz, dt_s=1, complexMix=True): '''Mix a given signal to a carrier frequency. Inputs: sig_iq (complex[]) : Signal to mix to some carrier. fc_Hz (float) : Carrier frequency in Hz to mix signal to. dt_s (int) : <= 1/sample rate. Default is 1 for normalized frequency. Output: (complex[]) : sig_iq mixed to carrier fc_Hz. ''' n = sig_iq.size t_s = np.linspace(dt_s, n*dt_s, n) if complexMix: # Complex signal. carrier_iq = np.exp(2j*pi*fc_Hz*t_s) else: # Real signal (for use with older literature). carrier_iq = sin(2*pi*fc_Hz*t_s) return sig_iq*carrier_iq def mkSig(config, useSrrc=False): '''Make a signal using the passed dictionary values. Input: config (dictionary) : See configDefault() for description of entries. Output: (complex) : i/q samples making up signal. ''' bps = config['bps'] # Data rata in bits/sec. fc_Hz = config['fc_Hz'] # Center frequency in Hz. fs_Hz = config['fs_Hz'] # Sample rate in Sa/sec. Lintense = config['Lintense'] # Phase noise intensity. XXX Poorly done! nSym = config['nSym'] # Number of symbols in generated signal. outfile = config['out'] # File to create, ignored here. sigType = config['sig'] # Modulation to use. snr_dB = config['snr_dB'] # S/N ratio of generated signal. bitsPerSym = {'bpsk':1, '4psk':2, '4qam':2, 'qpsk':2, '8psk':3, '16qam':4, '64qam':8} b = bitsPerSym[sigType] # Bits/sym for this modulation scheme. # Create baseband i/q signal with 1 sample/symbol. if sigType[-3:] == 'psk': m = 2**b # Number symbols in modulation scheme. ind = np.random.randint(0, m, nSym) # Random symbol sequence. c = constellationPsk(m) # m-PSK constellation. sig = c[ind] else: # 'qam' indI = np.random.randint(0, b, nSym) # Random symbol sequence. indQ = np.random.randint(0, b, nSym) # Random symbol sequence. c = constellationQam(b) sig = c[indI, indQ] # Calculate sps, samples/symbol. fs_Bd = bps/b # (b/s)/(b/sym) = sym/s = baud sps = int(fs_Hz/fs_Bd) # (Sa/s)/(sym/s) = Sa/sym config['sps'] = sps if not useSrrc: # Rectangular pulse shaping. sig = np.repeat(sig, sps) # sps Sa/sym. else: # Square root raised cosine pulse shaping. # NB, can only use this when rx does as well. h = srrc(sps) z = np.zeros(sig.size*sps, dtype=complex) z[::sps] = sig # Each bit followed by sps-1 zeros. sig = np.convolve(z, h, mode='valid') # Pulse shape. sigTx = mix(sig, fc_Hz) # Mix to carrier frequency. sigRx = mkSnr(sigTx, snr_dB) # Received sig across AWGN channel. # Phase noise is dBc/Hz at given offsets. XXX Here, it is random. Hack! if Lintense > 0: L = np.exp(1j*Lintense*np.random.randn(sigRx.size)) # Phase noise. sigRx *= L return sigRx def mkSnr(s_V, snr_dB): '''Add noise to a noise-free signal to achieve specified signal to noise ratio. Inputs: s_V (complex[]) - input signal snr_dB (float) - desired signal to noise ratio in units of dB. Output: (complex[]) - noisy version of input signal. ''' # Calculate noise power level needed to reach desired s/n. snrDesired = 10**(snr_dB/10) # Linear ratio. s_W = power_W(s_V) nDesired_W = s_W/snrDesired # Create noise at needed level. n_V = (np.random.randn(s_V.size) + 1j*np.random.randn(s_V.size)) n_W = power_W(n_V) n_V *= sqrt(nDesired_W/n_W) if False: # Debug statements. print('pS dB = %e' % W_to_dB(s_W)) n_W = power_W(n_V) print('pN dB = %e' % W_to_dB(n_W)) print('s/n dB = %e' % W_to_dB(s_W/n_W)) return s_V + n_V def plot(sig, sigType): '''Simple routine to plot i/q signal. ''' plt.title(sigType.upper()) plt.xlabel('In-phase') plt.ylabel('Quadrature') if np.max(np.imag(sig)) < 0.8: plt.ylim(-1, 1) plt.plot(np.real(sig), np.imag(sig), '.') plt.grid(True) plt.show() def power_W(sig_V): return np.mean(np.abs(sig_V)**2) def srrc(Ts, beta=0.35, nTaps=100): '''Construct a square root raised cosine filter. Impulse response, h(t), from https://en.wikipedia.org/wiki/Root-raised-cosine_filter Inputs: Ts (float) : reciprocal of symbol rate, or samples/symbol. nTaps (int) : number of taps in filter. beta (float) : roll off factor. ''' t = np.arange(-nTaps//2, nTaps//2+1) # General case. num = 4*beta*t/Ts num *= cos(pi*t/Ts*(1 + beta)) num += sin(pi*t/Ts*(1 - beta)) denom = pi*t*(1 - (4*beta*t/Ts)**2) denom[denom==0] = 1 # Singularities handled below. h = num/denom # Singularity t = 0 case. z = np.where(t == 0) h[z] = 1/Ts*(1 + beta*(4/pi - 1)) # Singularity t = +/- Ts/4Beta cases. z1 = np.where(t == -Ts/(4*beta)) z2 = np.where(t == Ts/(4*beta)) if len(z1[0]) > 0 or len(z2[0]) > 0: v = (1 + 2/pi)*sin(pi/(4*beta)) v += (1 - 2/pi)*cos(pi/(4*beta)) v *= beta/(Ts*sqrt(2)) h[z1] = h[z2] = v if False: plt.title('Square Root Raised Cosine Filter') plt.plot(t, h) plt.grid(True) plt.show() return h def typeVerify(val, valType, errMsg=None): '''Verify a value is specified type, and print if it isn't. For example, typeVerify('12.3', float, 'Oh no') 12.3 typeVerify('12.3', int, 'Oh no') Oh no Input: val : value to be type checked. valType : type to check 'val' against. Output: (valType or error) : ''' try: return valType(str(val)) except ValueError: if errMsg == None: errMsg = '%s not %s.\n' % (str(val), str(valType)) print(errMsg) usage() def usage(): s = 'Usage: mksig [-b bps] [-c fc] [-f fs] [-h] [-n n] [-o out] [-s snr] ' s += '[-t type]' print(s, end='') print('') print('-b: bits/sec, default 1 bps.') print('-c: center freq in Hz, default 0 Hz.') print('-d: dual BPSK example, creates bpskDual.iq.') print('-f: sample rate in Hz, default 1 Hz.') print('-h: help message.') print('-n: number of symbols, default 10.') print('-o: output file name.') print('-p: phase noise random intensity, default 0.') print('-s: SNR in dB, default 30 dB.') print('-t: type of signal, default bpsk. Any of 16qam, 4psk, 4qam, ') print(' 64qam, 8psk, bpsk, qpsk.') sys.exit(1) def W_to_dB(val_W): return 10*log10(val_W) def writeSig(x, filename): f = open(filename, 'wb') for i in range(x.size): f.write(x[i].real) f.write(x[i].imag) f.close() # # m a i n # def main(argv): c = cmdLine(argv) # Dictionary of settings. outfile = c['out'] # File to create. fc_Hz = c['fc_Hz'] # Center frequency in Hz. sigType = c['sig'] # Modulation. if outfile == '': bps = c['bps'] # Data rata in bits/sec. fs_Hz = c['fs_Hz'] # Sample rate in Sa/sec. snr_dB = c['snr_dB'] # S/N ratio of generated signal. outfile = ('%s_fs_%.6g_fc_%.6g_bps_%.6g_snr_%.1f.iq' % (sigType, fs_Hz, fc_Hz, bps, snr_dB)) sig = mkSig(c) writeSig(sig, outfile) print('%s created.' % outfile) sig = mix(sig, -fc_Hz, 1/fs_Hz) plt.title('Shaped Bit Pulses') plt.plot(sig.real, '.-') # Imag is near 0 for bpsk. plt.grid(True) plt.show() plot(sig, sigType) # Mix to baseband and plot. if __name__ == '__main__': main(sys.argv)