Source code for roqba.composers.amadinda
import os
import itertools
import threading
from random import choice
from roqba.composers.abstract_composer import AbstractComposer
from roqba.static.scales_and_harmonies import FOLLOWINGS
[docs]class Composer(AbstractComposer):
def __init__(self, gateway, settings, behaviour, scale="DIATONIC"):
# General
super(Composer, self).__init__(gateway,
settings,
behaviour)
self.selected_meters = [self.behaviour['meter']]
self.half_beat = self.behaviour['half_beat']
self.second_beat_half = False
# Amadinda specific
self.tone_range = behaviour['tone_range']
self.sequence_length = behaviour['sequence_length']
self.number_of_tones_in_3rd_voice = behaviour['number_of_tones_in_3rd_voice']
self.pattern_played_times = 0
self.pattern_played_maximum = 90
self.words = self.all_python_words()
self.octave_offset = behaviour['octave_offset']
self.transpose = 20
for voice in self.voices.values():
voice.duration_in_msec = 600
self.set_scale(self.scale)
self.make_new_pattern()
self.gateway.mute_voice("drums", 1)
[docs] def all_python_words(self):
filepaths = itertools.chain(*[[os.path.join(entry[0], file_)
for file_ in entry[2] if file_.endswith('py')]
for entry in list(os.walk('.'))])
lines = []
for filepath in filepaths:
with open(filepath) as file_:
lines.extend(file_.readlines())
words = itertools.chain(*[line.split(" ") for line in lines])
return [word.strip() for word in words if word.strip() and len(word.strip()) >= 12]
[docs] def choose_rhythm(self):
pass
[docs] def generate(self, state):
"""main generating function, the next polyphonic step is produced here
any of the voices can change.
"""
self.comment = 'normal'
tmp_harm = []
meter_pos = state['cycle_pos']
for voice in self.voices.values():
if len(self.voices) < self.num_voices:
raise (RuntimeError, "mismatch in voices count")
self.musical_logger.debug("note {0}".format(voice.note))
if voice.note == 0 or not voice.note_change:
continue
voice.note_change = True
next_note = self.next_voice_note(voice, meter_pos)
tmp_harm.append(next_note)
cycle_pos = state['cycle_pos']
send_drum = True
if self.half_beat:
if cycle_pos % 2 == 0:
cycle_pos = cycle_pos / 2
if self.second_beat_half:
cycle_pos += int(self.meter[0] / 2)
self.drummer.generator.send([state, cycle_pos])
else:
send_drum = False
else:
self.drummer.generator.send([state, cycle_pos])
for k, v in self.drummer.frame.items():
# TODO: re-add the drum filler
if False and v["meta"]:
if v["meta"] == 'empty':
threading.Thread(target=self.drum_fill_handler,
args=(k, state)).start()
if v["meta"] == 'mark':
threading.Thread(target=self.drum_mark_handler,
args=(k, state)).start()
if send_drum:
self.gateway.drum_hub.send(self.drummer.frame)
for voice in self.voices.values():
self.gateway.send_voice_peak_level(voice, voice.current_microvolume)
self.gateway.hub.send(self.voices)
if self.notate:
self.notator.note_to_file({"notes": tmp_harm,
"weight": state["weight"],
"cycle_pos": state["cycle_pos"]})
return self.comment
[docs] def next_voice_note(self, voice, meter_pos):
voice.update_current_microvolume()
if voice.behaviour == "SLAVE":
follow = self.voices[voice.followed_voice_id]
res = 0
if follow.note_change:
if voice.following_counter == 0:
voice.follow_dist = choice(FOLLOWINGS)
if voice.following_counter < voice.follow_limit:
res = follow.note and follow.note + voice.follow_dist or 0
voice.following_counter += 1
else:
voice.reset_slave()
res = 0
if follow.note == 0:
res = 0
next_note = res
voice.real_note = next_note and self.real_scale[next_note] or None
return res
if self.pattern_played_times >= self.pattern_played_maximum:
self.make_new_pattern()
self.comment = 'caesura'
self.pattern_played_times = 0
which_shift_index = int(self.pattern_played_times / (self.pattern_played_maximum / 4.999))
next_note = self.patterns[which_shift_index][voice.id][meter_pos] or None
voice.note = next_note
voice.real_note = next_note and self.real_scale[next_note] or None
self.pattern_played_times += 1.0 / len(self.patterns[0][1])
return next_note
[docs] def make_new_pattern(self):
word1 = choice(self.words)
pure_seq1 = [(ord(char) % self.tone_range) for char in word1][:self.sequence_length]
word2 = choice(self.words)
pure_seq2 = [(ord(char) % self.tone_range) for char in word2][:self.sequence_length]
self.pattern_played_times = 0
self.patterns = {}
for offset in range(0, 5):
self.patterns[offset] = self.shift_pattern(pure_seq1, pure_seq2, offset)
[docs] def shift_pattern(self, seq1, seq2, offset):
return self._apply_new_pattern([(entry + offset) % self.tone_range for entry in seq1],
[(entry + offset) % self.tone_range for entry in seq2],)
def _apply_new_pattern(self, pure_seq1, pure_seq2):
self.applied_seq1 = list(itertools.chain(
*zip([tone + self.transpose for tone in pure_seq1],
[0] * self.sequence_length)))
self.applied_seq2 = list(itertools.chain(
*zip([0] * self.sequence_length,
[tone + self.transpose for tone in pure_seq2])))
seq3 = self._make_third_voice(self.applied_seq1, self.applied_seq2)
return {
1: self.applied_seq1,
2: self.applied_seq2,
3: seq3,
4: seq3
}
def _make_third_voice(self, applied_seq1, applied_seq2):
seq3 = []
for idx in range(self.sequence_length * 2):
if (applied_seq1[idx] > 0
and applied_seq1[idx] < self.transpose + (self.number_of_tones_in_3rd_voice)):
seq3.append(applied_seq1[idx] + (2 * self.octave_offset - 1))
elif (applied_seq2[idx] > 0
and applied_seq2[idx] < self.transpose + (self.number_of_tones_in_3rd_voice)):
seq3.append(applied_seq2[idx] + (2 * self.octave_offset - 1))
else:
seq3.append(0)
return seq3