Commit fe7006e3 authored by W. Trevor King's avatar W. Trevor King
Browse files

Replace igor.igorpy parsing with translations from igor.packed.load.

parent 1d1e8810
......@@ -14,89 +14,54 @@ The usual igor folder types are given in the technical reports
PTN003.ifn and TN003.ifn.
"""
from __future__ import absolute_import
import io as _io
import re as _re
__version__='1.0'
import numpy as _numpy
import struct
import numpy
import sys
import re
from .binarywave import MAXDIMS as _MAXDIMS
from .packed import load as _load
from .record.base import UnknownRecord as _UnknownRecord
from .record.folder import FolderStartRecord as _FolderStartRecord
from .record.folder import FolderEndRecord as _FolderEndRecord
from .record.history import HistoryRecord as _HistoryRecord
from .record.history import GetHistoryRecord as _GetHistoryRecord
from .record.history import RecreationRecord as _RecreationRecord
from .record.packedfile import PackedFileRecord as _PackedFileRecord
from .record.procedure import ProcedureRecord as _ProcedureRecord
from .record.wave import WaveRecord as _WaveRecord
from .record.variables import VariablesRecord as _VariablesRecord
__version__='0.10'
decode = lambda s: s.decode(sys.getfilesystemencoding())
PYKEYWORDS = set(('and','as','assert','break','class','continue',
'def','elif','else','except','exec','finally',
'for','global','if','import','in','is','lambda',
'or','pass','print','raise','return','try','with',
'yield'))
PYID = re.compile(r"^[^\d\W]\w*$", re.UNICODE)
PYID = _re.compile(r"^[^\d\W]\w*$", _re.UNICODE)
def valid_identifier(s):
"""Check if a name is a valid identifier"""
return PYID.match(s) and s not in PYKEYWORDS
NUMTYPE = {
1: numpy.complex64,
2: numpy.float32,
3: numpy.complex64,
4: numpy.float64,
5: numpy.complex128,
8: numpy.int8,
16: numpy.int16,
32: numpy.int32,
64+ 8: numpy.uint8,
64+16: numpy.uint16,
64+32: numpy.uint32,
}
ORDER_NUMTYPE = {
1: 'c8',
2: 'f4',
3: 'c8',
4: 'f8',
5: 'c16',
8: 'i1',
16: 'i2',
32: 'i4',
64+ 8: 'u2',
64+16: 'u2',
64+32: 'u4',
}
class IgorObject(object):
""" Parent class for all objects the parser can return """
pass
class Formula(IgorObject):
def __init__(self, formula, value):
self.formula = formula
self.value = value
class Variables(IgorObject):
"""
Contains system numeric variables (e.g., K0) and user numeric and string variables.
"""
def __init__(self, data, order):
version, = struct.unpack(order+"h",data[:2])
if version == 1:
pos = 8
nSysVar, nUserVar, nUserStr \
= struct.unpack(order+"hhh",data[2:pos])
nDepVar, nDepStr = 0, 0
elif version == 2:
pos = 12
nSysVar, nUservar, nUserStr, nDepVar, nDepStr \
= struct.unpack(order+"hhh",data[2:pos])
else:
raise ValueError("Unknown variable record version "+str(version))
self.sysvar, pos = _parse_sys_numeric(nSysVar, order, data, pos)
self.uservar, pos = _parse_user_numeric(nUserVar, order, data, pos)
if version == 1:
self.userstr, pos = _parse_user_string1(nUserStr, order, data, pos)
else:
self.userstr, pos = _parse_user_string2(nUserStr, order, data, pos)
self.depvar, pos = _parse_dep_numeric(nDepVar, order, data, pos)
self.depstr, pos = _parse_dep_string(nDepStr, order, data, pos)
def __init__(self, record):
self.sysvar = record.variables['variables']['sysVars']
self.uservar = record.variables['variables']['userVars']
self.userstr = record.variables['variables']['userStrs']
self.depvar = record.variables['variables'].get('dependentVars', {})
self.depstr = record.variables['variables'].get('dependentStrs', {})
def format(self, indent=0):
return " "*indent+"<Variables: system %d, user %d, dependent %s>"\
%(len(self.sysvar),
......@@ -107,7 +72,8 @@ class History(IgorObject):
"""
Contains the experiment's history as plain text.
"""
def __init__(self, data, order): self.data = data
def __init__(self, data):
self.data = data
def format(self, indent=0):
return " "*indent+"<History>"
......@@ -115,101 +81,33 @@ class Wave(IgorObject):
"""
Contains the data for a wave
"""
def __init__(self, data, order):
version, = struct.unpack(order+'h',data[:2])
if version == 1:
pos = 8
extra_offset,checksum = struct.unpack(order+'ih',data[2:pos])
formula_size = note_size = pic_size = 0
elif version == 2:
pos = 16
extra_offset,note_size,pic_size,checksum \
= struct.unpack(order+'iiih',data[2:pos])
formula_size = 0
elif version == 3:
pos = 20
extra_offset,note_size,formula_size,pic_size,checksum \
= struct.unpack(order+'iiiih',data[2:pos])
elif version == 5:
checksum,extra_offset,formula_size,note_size, \
= struct.unpack(order+'hiii',data[2:16])
Esize = struct.unpack(order+'iiiiiiiii',data[16:52])
textindsize, = struct.unpack('i',data[52:56])
picsize = 0
pos = 64
else:
raise ValueError("unknown wave version "+str(version))
extra_offset += pos
if version in (1,2,3):
type, = struct.unpack(order+'h',data[pos:pos+2])
name = data[pos+6:data.find(chr(0),pos+6,pos+26)]
#print "name3",name,type
data_units = data[pos+34:data.find(chr(0),pos+34,pos+38)]
xaxis = data[pos+38:data.find(chr(0),pos+38,pos+42)]
points, = struct.unpack(order+'i',data[pos+42:pos+46])
hsA,hsB = struct.unpack(order+'dd',data[pos+48:pos+64])
fsValid,fsTop,fsBottom \
= struct.unpack(order+'hdd',data[pos+70:pos+88])
created,_,modified = struct.unpack(order+'IhI',data[pos+98:pos+108])
pos += 110
dims = (points,0,0,0)
sf = (hsA,0,0,0,hsB,0,0,0)
axis_units = (xaxis,"","","")
else: # version is 5
created,modified,points,type \
= struct.unpack(order+'IIih',data[pos+4:pos+18])
name = data[pos+28:data.find(chr(0),pos+28,pos+60)]
#print "name5",name,type
dims = struct.unpack(order+'iiii',data[pos+68:pos+84])
sf = struct.unpack(order+'dddddddd',data[pos+84:pos+148])
data_units = data[pos+148:data.find(chr(0),pos+148,pos+152)]
axis_units = tuple(data[pos+152+4*i
: data.find(chr(0),pos+152+4*i,pos+156+4*i)]
for i in range(4))
fsValid,_,fsTop,fsBottom \
= struct.unpack(order+'hhdd',data[pos+172:pos+192])
pos += 320
if type == 0:
text = data[pos:extra_offset]
textind = numpy.fromstring(data[-textindsize:], order+'i')
textind = numpy.hstack((0,textind))
value = [text[textind[i]:textind[i+1]]
for i in range(len(textind)-1)]
def __init__(self, record):
d = record.wave['wave']
self.name = d['wave_header']['bname']
self.data = d['wData']
self.fs = d['wave_header']['fsValid']
self.fstop = d['wave_header']['topFullScale']
self.fsbottom = d['wave_header']['botFullScale']
if record.wave['version'] in [1,2,3]:
dims = [d['wave_header']['npnts']] + [0]*(_MAXDIMS-1)
sfA = [d['wave_header']['hsA']] + [0]*(_MAXDIMS-1)
sfB = [d['wave_header']['hsB']] + [0]*(_MAXDIMS-1)
self.data_units = [d['wave_header']['dataUnits']]
self.axis_units = [d['wave_header']['xUnits']]
else:
trimdims = tuple(d for d in dims if d)
dtype = order+ORDER_NUMTYPE[type]
size = int(dtype[2:])*numpy.prod(trimdims)
value = numpy.fromstring(data[pos:pos+size],dtype)
value = value.reshape(trimdims)
pos = extra_offset
formula = data[pos:pos+formula_size]
pos += formula_size
notes = data[pos:pos+note_size]
pos += note_size
if version == 5:
offset = numpy.cumsum(numpy.hstack((pos,Esize)))
Edata_units = data[offset[0]:offset[1]]
Eaxis_units = [data[offset[i]:offset[i+1]] for i in range(1,5)]
Eaxis_labels = [data[offset[i]:offset[i+1]] for i in range(5,9)]
if Edata_units: data_units = Edata_units
for i,u in enumerate(Eaxis_units):
if u: axis_units[i] = u
axis_labels = Eaxis_labels
pos = offset[-1]
self.name = decode(name)
self.data = value
self.data_units = data_units
self.axis_units = axis_units
self.fs,self.fstop,self.fsbottom = fsValid,fsTop,fsBottom
self.axis = [numpy.linspace(a,b,n)
for a,b,n in zip(sf[:4],sf[4:],dims)]
self.formula = formula
self.notes = notes
dims = d['wave_header']['nDim']
sfA = d['wave_header']['sfA']
sfB = d['wave_header']['sfB']
# TODO find example with multiple data units
self.data_units = [d['data_units']]
self.axis_units = [d['dimension_units']]
self.data_units.extend(['']*(_MAXDIMS-len(self.data_units)))
self.data_units = tuple(self.data_units)
self.axis_units.extend(['']*(_MAXDIMS-len(self.axis_units)))
self.axis_units = tuple(self.axis_units)
self.axis = [_numpy.linspace(a,b,c) for a,b,c in zip(sfA, sfB, dims)]
self.formula = d.get('formula', '')
self.notes = d.get('note', '')
def format(self, indent=0):
if isinstance(self.data, list):
type,size = "text", "%d"%len(self.data)
......@@ -226,14 +124,16 @@ class Recreation(IgorObject):
"""
Contains the experiment's recreation procedures as plain text.
"""
def __init__(self, data, order): self.data = data
def __init__(self, data):
self.data = data
def format(self, indent=0):
return " "*indent + "<Recreation>"
class Procedure(IgorObject):
"""
Contains the experiment's main procedure window text as plain text.
"""
def __init__(self, data, order): self.data = data
def __init__(self, data):
self.data = data
def format(self, indent=0):
return " "*indent + "<Procedure>"
class GetHistory(IgorObject):
......@@ -245,37 +145,28 @@ class GetHistory(IgorObject):
GetHistory entry simply says that the Recreation has run, and the History
can be restored from the previously saved value.
"""
def __init__(self, data, order): self.data = data
def __init__(self, data):
self.data = data
def format(self, indent=0):
return " "*indent + "<GetHistory>"
class PackedFile(IgorObject):
"""
Contains the data for a procedure file or notebook in packed form.
"""
def __init__(self, data, order): self.data = data
def __init__(self, data):
self.data = data
def format(self, indent=0):
return " "*indent + "<PackedFile>"
class Unknown(IgorObject):
"""
Record type not documented in PTN003/TN003.
"""
def __init__(self, data, order, type):
def __init__(self, data, type):
self.data = data
self.type = type
def format(self, indent=0):
return " "*indent + "<Unknown type %s>"%self.type
class _FolderStart(IgorObject):
"""
Marks the start of a new data folder.
"""
def __init__(self, data, order):
self.name = decode(data[:data.find(chr(0))])
class _FolderEnd(IgorObject):
"""
Marks the end of a data folder.
"""
def __init__(self, data, order): self.data = data
class Folder(IgorObject):
"""
......@@ -320,122 +211,61 @@ class Folder(IgorObject):
children = [r.format(indent=indent+2) for r in self.children]
return u"\n".join([parent]+children)
PARSER = {
1: Variables,
2: History,
3: Wave,
4: Recreation,
5: Procedure,
7: GetHistory,
8: PackedFile,
9: _FolderStart,
10: _FolderEnd,
}
def loads(s, ignore_unknown=True):
def loads(s, **kwargs):
"""Load an igor file from string"""
max = len(s)
pos = 0
ret = []
stream = _io.BytesIO(s)
return load(stream, **kwargs)
def load(filename, **kwargs):
"""Load an igor file"""
try:
packed_experiment = _load(filename)
except ValueError as e:
if e.message.startswith('not enough data for the next record header'):
raise IOError('invalid record header; bad pxp file?')
elif e.message.startswith('not enough data for the next record'):
raise IOError('final record too long; bad pxp file?')
raise
return _convert(packed_experiment, **kwargs)
def _convert(packed_experiment, ignore_unknown=True):
records, filesystem = packed_experiment
stack = [Folder(path=[u'root'])]
while pos < max:
if pos+8 > max:
raise IOError("invalid record header; bad pxp file?")
ignore = ord(s[pos])&0x80
order = '<' if ord(s[pos])&0x77 else '>'
type, version, length = struct.unpack(order+'hhi',s[pos:pos+8])
pos += 8
if pos+length > len(s):
raise IOError("final record too long; bad pxp file?")
data = s[pos:pos+length]
pos += length
if not ignore:
parse = PARSER.get(type, None)
if parse:
record = parse(data, order)
elif ignore_unknown:
for record in records:
if isinstance(record, _UnknownRecord):
if ignore_unknown:
continue
else:
record = Unknown(data=data, order=order, type=type)
if isinstance(record, _FolderStart):
path = stack[-1].path+[record.name]
folder = Folder(path)
stack[-1].append(folder)
stack.append(folder)
elif isinstance(record, _FolderEnd):
stack.pop()
else:
stack[-1].append(record)
r = Unknown(record.data, type=record.header['recordType'])
elif isinstance(record, _GetHistoryRecord):
r = GetHistory(record.text)
elif isinstance(record, _HistoryRecord):
r = History(record.text)
elif isinstance(record, _PackedFileRecord):
r = PackedFile(record.text)
elif isinstance(record, _ProcedureRecord):
r = Procedure(record.text)
elif isinstance(record, _RecreationRecord):
r = Recreation(record.text)
elif isinstance(record, _VariablesRecord):
r = Variables(record)
elif isinstance(record, _WaveRecord):
r = Wave(record)
else:
r = None
if isinstance(record, _FolderStartRecord):
path = stack[-1].path+[record.null_terminated_text]
folder = Folder(path)
stack[-1].append(folder)
stack.append(folder)
elif isinstance(record, _FolderEndRecord):
stack.pop()
elif r is None:
raise NotImplementedError(record)
else:
stack[-1].append(r)
if len(stack) != 1:
raise IOError("FolderStart records do not match FolderEnd records")
return stack[0]
def load(filename, ignore_unknown=True):
"""Load an igor file"""
return loads(open(filename,'rb').read(),
ignore_unknown=ignore_unknown)
# ============== Variable parsing ==============
def _parse_sys_numeric(n, order, data, pos):
values = numpy.fromstring(data[pos:pos+n*4], order+'f')
pos += n*4
var = dict(('K'+str(i),v) for i,v in enumerate(values))
return var, pos
def _parse_user_numeric(n, order, data, pos):
var = {}
for i in range(n):
name = data[pos:data.find(chr(0),pos,pos+32)]
type,numtype,real,imag = struct.unpack(order+"hhdd",data[pos+32:pos+52])
dtype = NUMTYPE[numtype]
if dtype in (numpy.complex64, numpy.complex128):
value = dtype(real+1j*imag)
else:
value = dtype(real)
var[name] = value
pos += 56
return var, pos
def _parse_dep_numeric(n, order, data, pos):
var = {}
for i in range(n):
name = data[pos:data.find(chr(0),pos,pos+32)]
type,numtype,real,imag = struct.unpack(order+"hhdd",data[pos+32:pos+52])
dtype = NUMTYPE[numtype]
if dtype in (numpy.complex64, numpy.complex128):
value = dtype(real+1j*imag)
else:
value = dtype(real)
length, = struct.unpack(order+"h",data[pos+56:pos+58])
var[name] = Formula(data[pos+58:pos+58+length-1], value)
pos += 58+length
return var, pos
def _parse_dep_string(n, order, data, pos):
var = {}
for i in range(n):
name = data[pos:data.find(chr(0),pos,pos+32)]
length, = struct.unpack(order+"h",data[pos+48:pos+50])
var[name] = Formula(data[pos+50:pos+50+length-1], "")
pos += 50+length
return var, pos
def _parse_user_string1(n, order, data, pos):
var = {}
for i in range(n):
name = data[pos:data.find(chr(0),pos,pos+32)]
length, = struct.unpack(order+"h",data[pos+32:pos+34])
value = data[pos+34:pos+34+length]
pos += 34+length
var[name] = value
return var, pos
def _parse_user_string2(n, order, data, pos):
var = {}
for i in range(n):
name = data[pos:data.find(chr(0),pos,pos+32)]
length, = struct.unpack(order+"i",data[pos+32:pos+36])
value = data[pos+36:pos+36+length]
pos += 36+length
var[name] = value
return var, pos
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment