Source code for ndspy.soundStream

# Copyright 2019 RoadrunnerWMC
#
# This file is part of ndspy.
#
# ndspy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ndspy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ndspy.  If not, see <https://www.gnu.org/licenses/>.
"""
Support for STRMs.
"""

import struct

from . import WaveType
from . import _common


[docs]class STRM: """ A STRM streamed audio file. This is a piece of music, usually used for background music or jingles. """ # When saving SDAT, two otherwise identical STRMs will share data # only if their dataMergeOptimizationIDs are the same. # You can pretty safely ignore this. dataMergeOptimizationID = 0 def __init__(self, file=None, unk02=0, volume=127, priority=64, playerID=0, unk07=0): self.unk02 = unk02 self.volume = volume self.priority = priority self.playerID = playerID self.unk07 = unk07 if file is not None: if not file.startswith(b'STRM'): raise ValueError("Wrong magic (should be b'STRM', instead" f' found {file[:4]})') self._initFromData(file) else: self.waveType = WaveType.PCM8 self.isLooped = False self.channels = [] self.unk03 = 0 self.sampleRate = 8000 self.time = 0 self.loopOffset = 0 self.samplesPerBlock = 0 self.samplesInLastBlock = 0 self.unk28 = 0 self.unk2C = 0 self.unk30 = 0 self.unk34 = 0 self.unk38 = 0 self.unk3C = 0 self.unk40 = 0 self.unk44 = 0
[docs] @classmethod def fromChannels(cls, channels, unk02=0, volume=127, priority=64, playerID=0, unk07=0): """ Create a STRM from a list of channels. """ obj = cls(unk02=unk02, volume=volume, priority=priority, playerID=playerID, unk07=unk07) obj.channels = channels return obj
[docs] @classmethod def fromFile(cls, filePath, *args, **kwargs): """ Load a STRM from a filesystem file. """ with open(filePath, 'rb') as f: return cls(f.read(), *args, **kwargs)
def _initFromData(self, file): """ Initialize the STRM from file data. """ magic, bom, version, filesize, headersize, numblocks = \ _common.NDS_STD_FILE_HEADER.unpack_from(file, 0) if version != 0x100: raise ValueError(f'Unsupported STRM version: {version}') assert magic == b'STRM', f'Incorrect STRM magic ({magic})' headMagic, headSize = struct.unpack_from('<4sI', file, 0x10) assert headMagic == b'HEAD', f'Incorrect STRM HEAD magic ({headMagic})' (waveType, self.isLooped, numChannels, self.unk03, self.sampleRate, self.time, self.loopOffset, numSamples, dataOffset, numBlocks, bytesPerBlock, # per channel self.samplesPerBlock, # per channel bytesInLastBlock, # per channel self.samplesInLastBlock, # per channel self.unk28, self.unk2C, self.unk30, self.unk34, self.unk38, self.unk3C, self.unk40, self.unk44, ) = struct.unpack_from('<B?BB2H16I', file, 0x18) assert dataOffset == 0x68, f'Unexpected STRM data offset ({hex(dataOffset)})' self.waveType = WaveType(waveType) dataOffs = 0x10 + headSize dataMagic, dataSize = struct.unpack_from('<4sI', file, dataOffs) assert dataMagic == b'DATA', f'Incorrect STRM DATA magic ({dataMagic})' data = file[dataOffs + 8 : dataOffs + dataSize] isOneBigLongBlock = (numBlocks == 1 and waveType == WaveType.ADPCM) self.channels = [] for _ in range(numChannels): self.channels.append([]) offs = 0 for bn in range(numBlocks): blockSize = bytesPerBlock if bn == numBlocks - 1: blockSize = bytesInLastBlock if isOneBigLongBlock: blockSize += 4 for cn in range(numChannels): self.channels[cn].append(data[offs : offs + blockSize]) offs += blockSize while offs % 4: offs += 1
[docs] def save(self, *, updateTime=False): """ Generate file data representing this STRM, and then return that data, .unk02, .volume, .priority, .playerID, and .unk07, as a 6-tuple. This matches the parameters of the default class constructor. """ # Figure out the number of blocks, bytes per block, and bytes in # the last block (while also checking that these are consistent # across channels as they should be) numBlocks = bytesPerBlock = bytesInLastBlock = 0 if self.channels: numBlocks = len(self.channels[0]) # All channels should have the same number of blocks for i, blocks in enumerate(self.channels): if len(blocks) != numBlocks: raise ValueError(f'Channels 1 and {i + 1} have' f' different numbers of blocks ({numBlocks} vs' f' {len(blocks)})!') if numBlocks > 0: bytesPerBlock = len(self.channels[0][0]) bytesInLastBlock = len(self.channels[0][-1]) # All channels should have all but the final block be # bytesPerBlock bytes long, and the last one be # bytesInLastBlock long for i, blocks in enumerate(self.channels): for j, b in enumerate(blocks[:-1]): # ignore last block if len(b) != bytesPerBlock: raise ValueError('Detected block size is' f' {bytesPerBlock}, but block {j + 1} in' f' channel {i + 1} is {len(b)} bytes' ' long!') b = blocks[-1] if len(b) != bytesInLastBlock: raise ValueError('Detected last block size is' f' {bytesInLastBlock}, but the last block in' f' channel {i + 1} is {len(b)} bytes' ' long!') # Construct the wave data waveData = bytearray() for blocks in zip(*self.channels): for b in blocks: waveData.extend(b) while len(waveData) % 4: waveData.append(0) # Construct the file data data = bytearray() data.extend(_common.NDS_STD_FILE_HEADER.pack( b'STRM', 0xFEFF, 0x100, 0x68 + len(waveData), 0x10, 2)) if updateTime: self.time = int(1.0 / self.sampleRate * 16756991 / 32) adpcmBlockSizeAdjust = 0 if self.waveType == WaveType.ADPCM and numBlocks == 1: adpcmBlockSizeAdjust = 4 data.extend(struct.pack( '<4sI', b'HEAD', 0x50)) data.extend(struct.pack('<B?BB2H16I', self.waveType, self.isLooped, len(self.channels), self.unk03, self.sampleRate, self.time, self.loopOffset, (numBlocks - 1) * self.samplesPerBlock + self.samplesInLastBlock, 0x68, numBlocks, bytesPerBlock - adpcmBlockSizeAdjust, self.samplesPerBlock, bytesInLastBlock - adpcmBlockSizeAdjust, self.samplesInLastBlock, self.unk28, self.unk2C, self.unk30, self.unk34, self.unk38, self.unk3C, self.unk40, self.unk44, )) data.extend(struct.pack( '<4sI', b'DATA', 8 + len(waveData))) data.extend(waveData) return (bytes(data), self.unk02, self.volume, self.priority, self.playerID, self.unk07)
[docs] def saveToFile(self, filePath, *, updateTime=False): """ Generate file data representing this STRM, and save it to a filesystem file. """ d = self.save(updateTime=updateTime)[0] with open(filePath, 'wb') as f: f.write(d)
def __str__(self): if self.channels: if all(len(c) == len(self.channels[0]) for c in self.channels): num = len(self.channels[0]) word = 'block' if num == 1 else 'blocks' blockInfo = f'of {num} {word} each' else: blockInfo = 'of differing numbers of blocks each' num = len(self.channels) word = 'channel' if num == 1 else 'channels' channelsInfo = f'[{num} {word} {blockInfo}]' else: channelsInfo = '[0 channels]' try: waveType = WaveType(self.waveType).name except Exception: waveType = f'waveType={hex(self.waveType)}' looped = ' looped' if self.isLooped else '' return f'<strm {waveType} {self.sampleRate}Hz {channelsInfo} volume={self.volume}{looped}>' def __repr__(self): c = ['['] if self.channels: c.append('[') if self.channels[0]: c.append(_common.shortBytesRepr(self.channels[0][0])) if len(self.channels[0]) > 1: c.append(', ...') c.append(']') if len(self.channels) > 1: c.append(', ...') c.append(']') c = ''.join(c) return (f'{type(self).__name__}.fromChannels({c}' f', {self.unk02!r}, {self.volume!r}, {self.priority!r}' f', {self.playerID!r}, {self.unk07!r})')